From 1298e0e725219648f1e42ed00a43f695ac914e3a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 29 May 2020 14:22:32 -0400 Subject: [PATCH] reproduce sync error --- lbry/wallet/ledger.py | 6 +- lbry/wallet/server/block_processor.py | 2 + lbry/wallet/server/session.py | 14 ++++- .../blockchain/test_claim_commands.py | 56 +++++++++++++++++++ 4 files changed, 75 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 9f8747952..69cc54b36 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -167,6 +167,7 @@ class Ledger(metaclass=LedgerRegistry): self.coin_selection_strategy = None self._known_addresses_out_of_sync = set() + self.went_out_of_sync = asyncio.Queue() self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char) self._balance_cache = pylru.lrucache(100000) @@ -589,11 +590,12 @@ class Ledger(metaclass=LedgerRegistry): "******", address, remote_status, len(remote_history), len(remote_set), local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)), - "\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]), - "\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]) + "\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]), + "\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]) ) # log.warning("local: %s", local_history) # log.warning("remote: %s", remote_history) + self.went_out_of_sync.put_nowait(address) self._known_addresses_out_of_sync.add(address) return False else: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 437b24f18..3ad4e925a 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -756,6 +756,8 @@ class LBRYBlockProcessor(BlockProcessor): self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.sql: SQLDB = self.db.sql self.timer = Timer('BlockProcessor') + self.block_notify = asyncio.Event() + self.block_notify.set() def advance_blocks(self, blocks): self.sql.begin() diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 39ce8e7e3..0831ed019 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -915,10 +915,15 @@ class LBRYElectrumX(SessionBase): def sub_count(self): return len(self.hashX_subs) + UGLY_COUNT = 0 + async def notify(self, touched, height_changed): """Notify the client about changes to touched addresses (from mempool updates or new blocks) and height. """ + + self.UGLY_COUNT += 1 + if height_changed and self.subscribe_headers: args = (await self.subscribe_headers_result(), ) try: @@ -934,6 +939,11 @@ class LBRYElectrumX(SessionBase): for hashX in touched: alias = self.hashX_subs[hashX] + if self.UGLY_COUNT == 25: + print('sleeping for ', hashX) + if not self.bp.block_notify.is_set(): + await self.bp.block_notify.wait() + await asyncio.sleep(3) status = await self.address_status(hashX) changed[alias] = status @@ -955,8 +965,10 @@ class LBRYElectrumX(SessionBase): else: method = 'blockchain.address.subscribe' start = time.perf_counter() + if not self.bp.block_notify.is_set(): + await self.bp.block_notify.wait() t = asyncio.create_task(self.send_notification(method, (alias, status))) - t.add_done_callback(lambda _: self.logger.info("sent notification to %s in %s", alias, time.perf_counter() - start)) + t.add_done_callback(lambda _: self.logger.warning("sent notification to %s in %s", alias, time.perf_counter() - start)) if changed: es = '' if len(changed) == 1 else 'es' diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 4873e0cde..ee0d2ca0e 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -40,6 +40,8 @@ class ClaimTestCase(CommandTestCase): class ClaimSearchCommand(ClaimTestCase): + VERBOSITY = logging.WARNING + async def create_channel(self): self.channel = await self.channel_create('@abc', '1.0') self.channel_id = self.get_claim_id(self.channel) @@ -157,6 +159,60 @@ class ClaimSearchCommand(ClaimTestCase): await self.stream_abandon(txid=signed2['txid'], nout=0) await self.assertFindsClaims([], channel_ids=[channel_id2]) + async def test_break_it(self): + await self.generate(5) + address = await self.account.receiving.get_or_create_usable_address() + sendtxid = await self.blockchain.send_to_address(address, 1) + await self.confirm_tx(sendtxid) + address = await self.account.receiving.get_or_create_usable_address() + sendtxid = await self.blockchain.send_to_address(address, 1) + await self.confirm_tx(sendtxid) + address = await self.account.receiving.get_or_create_usable_address() + sendtxid = await self.blockchain.send_to_address(address, 1) + await self.confirm_tx(sendtxid) + address = await self.account.receiving.get_or_create_usable_address() + sendtxid = await self.blockchain.send_to_address(address, 1) + await self.confirm_tx(sendtxid) + await self.generate(7) + + async def _doit(n): + try: + await self.daemon.jsonrpc_channel_create( + name=f'@arena{n}', bid='0.1', blocking=True + ) + except InsufficientFundsError: + pass + + def doit(n): + asyncio.create_task(_doit(n)) + + async def break_it(): + count = 0 + for _ in range(4): + for _ in range(10): + count += 1 + if not count % 7: + asyncio.create_task(self.generate(1)) + doit(count) + if self.ledger._known_addresses_out_of_sync: + print('out of sync', self.ledger._known_addresses_out_of_sync) + await asyncio.sleep(1) + await self.generate(1) + + bp = self.conductor.spv_node.server.bp + break_task = asyncio.create_task(break_it()) + address = await self.ledger.went_out_of_sync.get() + bp.block_notify.clear() + print('%s is out of sync' % address) + with self.assertRaises(InsufficientFundsError): + await self.daemon.jsonrpc_channel_create( + name=f'@derp', bid='0.1', blocking=True + ) + self.assertTrue(False) + print("woohoo") + if not break_task.done(): + break_task.cancel() + async def test_pagination(self): await self.create_channel() await self.create_lots_of_streams()