diff --git a/tests/unit/test_headers.py b/tests/unit/test_headers.py index 00f628c16..92654bde5 100644 --- a/tests/unit/test_headers.py +++ b/tests/unit/test_headers.py @@ -27,9 +27,12 @@ class BitcoinHeadersTestCase(unittest.TestCase): req.add_header('Range', 'bytes=0-{}'.format(self.HEADER_BYTES-1)) with urlopen(req) as response, open(self.header_file_name, 'wb') as header_file: header_file.write(response.read()) - if os.path.getsize(self.header_file_name) != self.HEADER_BYTES: - os.remove(self.header_file_name) - raise Exception("Downloaded headers for testing are not the correct number of bytes.") + if os.path.getsize(self.header_file_name) != self.HEADER_BYTES: + os.remove(self.header_file_name) + raise Exception( + "Downloaded headers for testing are not the correct number of bytes. " + "They were deleted. Try running the tests again." + ) def get_bytes(self, upto: int = -1, after: int = 0) -> bytes: with open(self.header_file_name, 'rb') as headers: diff --git a/torba/basedatabase.py b/torba/basedatabase.py index 6f0572553..7af731f12 100644 --- a/torba/basedatabase.py +++ b/torba/basedatabase.py @@ -241,6 +241,12 @@ class BaseDatabase(SQLiteMixin): def release_outputs(self, txos): return self.reserve_outputs(txos, is_reserved=False) + def rewind_blockchain(self, above_height): + # TODO: + # 1. delete transactions above_height + # 2. update address histories removing deleted TXs + return defer.succeed(True) + @defer.inlineCallbacks def get_transaction(self, txid): result = yield self.run_query( diff --git a/torba/baseheader.py b/torba/baseheader.py index 442407c84..75be3c52e 100644 --- a/torba/baseheader.py +++ b/torba/baseheader.py @@ -6,7 +6,6 @@ from binascii import hexlify from twisted.internet import threads, defer -from torba.stream import StreamController from torba.util import ArithUint256 from torba.hash import double_sha256 @@ -37,8 +36,6 @@ class BaseHeaders: self.io = BytesIO() self.path = path self._size: Optional[int] = None - self._on_change_controller = StreamController() - self.on_changed = self._on_change_controller.stream self._header_connect_lock = defer.DeferredLock() def open(self): @@ -106,7 +103,7 @@ class BaseHeaders: try: for height, chunk in self._iterate_chunks(start, headers): try: - # validate_chunk() is CPU bound on large chunks + # validate_chunk() is CPU bound and reads previous chunks from file system yield threads.deferToThread(self.validate_chunk, height, chunk) except InvalidHeader as e: bail = True @@ -120,7 +117,6 @@ class BaseHeaders: # the goal here is mainly to ensure we're definitely flush()'ing yield threads.deferToThread(self.io.flush) self._size = None - self._on_change_controller.add(written) added += written if bail: break diff --git a/torba/baseledger.py b/torba/baseledger.py index bde70f59c..560c9842d 100644 --- a/torba/baseledger.py +++ b/torba/baseledger.py @@ -44,6 +44,10 @@ class TransactionEvent(namedtuple('TransactionEvent', ('address', 'tx', 'height' pass +class BlockHeightEvent(namedtuple('BlockHeightEvent', ('height', 'change'))): + pass + + class BaseLedger(metaclass=LedgerRegistry): name: str @@ -235,36 +239,54 @@ class BaseLedger(metaclass=LedgerRegistry): yield self.headers.close() @defer.inlineCallbacks - def update_headers(self, height=None, headers=None, count=1, subscription_update=False): + def update_headers(self, height=None, headers=None, subscription_update=False): rewound = 0 while True: - height = len(self.headers) if height is None else height - if headers is None: + if height is None or height > len(self.headers): + # sometimes header subscription updates are for a header in the future + # which can't be connected, so we do a normal header sync instead + height = len(self.headers) + headers = None + subscription_update = False + + if not headers: header_response = yield self.network.get_headers(height, 2001) - count = header_response['count'] headers = header_response['hex'] - if count <= 0: + if not headers: + # Nothing to do, network thinks we're already at the latest height. return added = yield self.headers.connect(height, unhexlify(headers)) if added > 0: - self._on_header_controller.add(added) + height += added + self._on_header_controller.add( + BlockHeightEvent(self.headers.height, added)) - if subscription_update and added == count: - # subscription updates are for latest header already - # so we don't need to check if there are newer / more - return + if rewound > 0: + # we started rewinding blocks and apparently found + # a new chain + rewound = 0 + yield self.db.rewind_blockchain(height) - if added == 0: - # headers were invalid, start rewinding + if subscription_update: + # subscription updates are for latest header already + # so we don't need to check if there are newer / more + # on another loop of update_headers(), just return instead + return + + elif added == 0: + # we had headers to connect but none got connected, probably a reorganization height -= 1 rewound += 1 - log.warning("Experiencing Blockchain Reorganization: Undoing header.") + log.warning( + "Blockchain Reorganization: attempting rewind to height {} from starting height {}" + .format(height, height+rewound) + ) + else: - # added all headers, see if there are more - height += added + raise IndexError("headers.connect() returned negative number ({})".format(added)) if height < 0: raise IndexError( @@ -272,17 +294,19 @@ class BaseLedger(metaclass=LedgerRegistry): "Something is very wrong. Maybe you are on the wrong blockchain?" ) - if rewound >= 50: + if rewound >= 100: raise IndexError( "Blockchain reorganization dropped {} headers. This is highly unusual. " - "Will not continue to attempt reorganizing." - .format(rewound) + "Will not continue to attempt reorganizing. Please, delete the ledger " + "synchronization directory inside your wallet directory (folder: '{}') and " + "restart the program to synchronize from scratch." + .format(rewound, self.get_id()) ) - headers = None + headers = None # ready to download some more headers # if we made it this far and this was a subscription_update - # it means something was wrong and now we're doing a more + # it means something went wrong and now we're doing a more # robust sync, turn off subscription update shortcut subscription_update = False