From a08cbf412dd829a9b1a838858fca19ea008e0ea8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 23 Mar 2020 12:26:07 -0300 Subject: [PATCH 1/2] on_ready.wait so we start after tx syncs --- lbry/testcase.py | 1 - lbry/wallet/ledger.py | 1 + lbry/wallet/orchstr8/node.py | 1 - tests/integration/blockchain/test_resolve_command.py | 1 - tests/integration/blockchain/test_sync.py | 1 - 5 files changed, 1 insertion(+), 4 deletions(-) diff --git a/lbry/testcase.py b/lbry/testcase.py index 667fab5e1..56c9cb4c6 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -370,7 +370,6 @@ class CommandTestCase(IntegrationTestCase): ) self.extra_wallet_node_port += 1 await wallet_node.start(self.conductor.spv_node, seed=seed) - await wallet_node.ledger.on_ready.first self.extra_wallet_nodes.append(wallet_node) upload_dir = os.path.join(wallet_node.data_path, 'uploads') diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 907aec8ba..8aa384f10 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -323,6 +323,7 @@ class Ledger(metaclass=LedgerRegistry): else: await self._report_state() self.on_transaction.listen(self._reset_balance_cache) + await self.on_ready.first async def join_network(self, *_): log.info("Subscribing and updating accounts.") diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 411b57599..a94a7bc0a 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -77,7 +77,6 @@ class Conductor: async def start_wallet(self): if not self.wallet_started: await self.wallet_node.start(self.spv_node) - await self.wallet_node.ledger.on_ready.first self.wallet_started = True async def stop_wallet(self): diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index b6477199a..dd7d586f4 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -87,7 +87,6 @@ class ResolveCommand(BaseResolveTestCase): await self.conductor.spv_node.stop() resolve_task = asyncio.create_task(self.resolve('foo')) await self.conductor.spv_node.start(self.conductor.blockchain_node) - await self.ledger.on_ready.first self.assertIsNotNone((await resolve_task)['claim_id']) async def test_winning_by_effective_amount(self): diff --git a/tests/integration/blockchain/test_sync.py b/tests/integration/blockchain/test_sync.py index 7a3ffd3a7..7af2bd1aa 100644 --- a/tests/integration/blockchain/test_sync.py +++ b/tests/integration/blockchain/test_sync.py @@ -27,7 +27,6 @@ class SyncTests(IntegrationTestCase): wallet_node = WalletNode(WalletManager, RegTestLedger, port=self.api_port) await wallet_node.start(self.conductor.spv_node, seed) self.started_nodes.append(wallet_node) - await wallet_node.ledger.on_ready.first return wallet_node async def test_nodes_with_same_account_stay_in_sync(self): From 2893f1eb9ea333db224e76e0e05008d55650d4f7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 23 Mar 2020 12:36:05 -0300 Subject: [PATCH 2/2] add other taskgroup for non-start-important tasks --- lbry/wallet/ledger.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 8aa384f10..ebdfc7612 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -154,6 +154,7 @@ class Ledger(metaclass=LedgerRegistry): self._tx_cache = pylru.lrucache(100000) self._update_tasks = TaskGroup() + self._other_tasks = TaskGroup() # that we dont need to start self._utxo_reservation_lock = asyncio.Lock() self._header_processing_lock = asyncio.Lock() self._address_update_locks: DefaultDict[str, asyncio.Lock] = defaultdict(asyncio.Lock) @@ -335,7 +336,9 @@ class Ledger(metaclass=LedgerRegistry): async def stop(self): self._update_tasks.cancel() + self._other_tasks.cancel() await self._update_tasks.done.wait() + await self._other_tasks.done.wait() await self.network.stop() await self.db.close() await self.headers.close() @@ -352,7 +355,7 @@ class Ledger(metaclass=LedgerRegistry): async with self._header_processing_lock: for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)): await self.headers.ensure_chunk_at(height) - self._update_tasks.add(doit()) + self._other_tasks.add(doit()) await self.update_headers() async def update_headers(self, height=None, headers=None, subscription_update=False):