improvements to reorg support

This commit is contained in:
Lex Berezhny 2018-08-16 21:41:22 -04:00
parent e7dcd9216b
commit dabedc17d0
4 changed files with 57 additions and 28 deletions

View file

@ -29,7 +29,10 @@ class BitcoinHeadersTestCase(unittest.TestCase):
header_file.write(response.read())
if os.path.getsize(self.header_file_name) != self.HEADER_BYTES:
os.remove(self.header_file_name)
raise Exception("Downloaded headers for testing are not the correct number of bytes.")
raise Exception(
"Downloaded headers for testing are not the correct number of bytes. "
"They were deleted. Try running the tests again."
)
def get_bytes(self, upto: int = -1, after: int = 0) -> bytes:
with open(self.header_file_name, 'rb') as headers:

View file

@ -241,6 +241,12 @@ class BaseDatabase(SQLiteMixin):
def release_outputs(self, txos):
return self.reserve_outputs(txos, is_reserved=False)
def rewind_blockchain(self, above_height):
# TODO:
# 1. delete transactions above_height
# 2. update address histories removing deleted TXs
return defer.succeed(True)
@defer.inlineCallbacks
def get_transaction(self, txid):
result = yield self.run_query(

View file

@ -6,7 +6,6 @@ from binascii import hexlify
from twisted.internet import threads, defer
from torba.stream import StreamController
from torba.util import ArithUint256
from torba.hash import double_sha256
@ -37,8 +36,6 @@ class BaseHeaders:
self.io = BytesIO()
self.path = path
self._size: Optional[int] = None
self._on_change_controller = StreamController()
self.on_changed = self._on_change_controller.stream
self._header_connect_lock = defer.DeferredLock()
def open(self):
@ -106,7 +103,7 @@ class BaseHeaders:
try:
for height, chunk in self._iterate_chunks(start, headers):
try:
# validate_chunk() is CPU bound on large chunks
# validate_chunk() is CPU bound and reads previous chunks from file system
yield threads.deferToThread(self.validate_chunk, height, chunk)
except InvalidHeader as e:
bail = True
@ -120,7 +117,6 @@ class BaseHeaders:
# the goal here is mainly to ensure we're definitely flush()'ing
yield threads.deferToThread(self.io.flush)
self._size = None
self._on_change_controller.add(written)
added += written
if bail:
break

View file

@ -44,6 +44,10 @@ class TransactionEvent(namedtuple('TransactionEvent', ('address', 'tx', 'height'
pass
class BlockHeightEvent(namedtuple('BlockHeightEvent', ('height', 'change'))):
pass
class BaseLedger(metaclass=LedgerRegistry):
name: str
@ -235,36 +239,54 @@ class BaseLedger(metaclass=LedgerRegistry):
yield self.headers.close()
@defer.inlineCallbacks
def update_headers(self, height=None, headers=None, count=1, subscription_update=False):
def update_headers(self, height=None, headers=None, subscription_update=False):
rewound = 0
while True:
height = len(self.headers) if height is None else height
if headers is None:
if height is None or height > len(self.headers):
# sometimes header subscription updates are for a header in the future
# which can't be connected, so we do a normal header sync instead
height = len(self.headers)
headers = None
subscription_update = False
if not headers:
header_response = yield self.network.get_headers(height, 2001)
count = header_response['count']
headers = header_response['hex']
if count <= 0:
if not headers:
# Nothing to do, network thinks we're already at the latest height.
return
added = yield self.headers.connect(height, unhexlify(headers))
if added > 0:
self._on_header_controller.add(added)
height += added
self._on_header_controller.add(
BlockHeightEvent(self.headers.height, added))
if subscription_update and added == count:
if rewound > 0:
# we started rewinding blocks and apparently found
# a new chain
rewound = 0
yield self.db.rewind_blockchain(height)
if subscription_update:
# subscription updates are for latest header already
# so we don't need to check if there are newer / more
# on another loop of update_headers(), just return instead
return
if added == 0:
# headers were invalid, start rewinding
elif added == 0:
# we had headers to connect but none got connected, probably a reorganization
height -= 1
rewound += 1
log.warning("Experiencing Blockchain Reorganization: Undoing header.")
log.warning(
"Blockchain Reorganization: attempting rewind to height {} from starting height {}"
.format(height, height+rewound)
)
else:
# added all headers, see if there are more
height += added
raise IndexError("headers.connect() returned negative number ({})".format(added))
if height < 0:
raise IndexError(
@ -272,17 +294,19 @@ class BaseLedger(metaclass=LedgerRegistry):
"Something is very wrong. Maybe you are on the wrong blockchain?"
)
if rewound >= 50:
if rewound >= 100:
raise IndexError(
"Blockchain reorganization dropped {} headers. This is highly unusual. "
"Will not continue to attempt reorganizing."
.format(rewound)
"Will not continue to attempt reorganizing. Please, delete the ledger "
"synchronization directory inside your wallet directory (folder: '{}') and "
"restart the program to synchronize from scratch."
.format(rewound, self.get_id())
)
headers = None
headers = None # ready to download some more headers
# if we made it this far and this was a subscription_update
# it means something was wrong and now we're doing a more
# it means something went wrong and now we're doing a more
# robust sync, turn off subscription update shortcut
subscription_update = False