Merge pull request #2877 from lbryio/on_ready_fixup

wait for tx sync to finish but not header sync
This commit is contained in:
Jack Robison 2020-03-23 13:46:47 -04:00 committed by GitHub
commit 60d89506a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 5 additions and 5 deletions

View file

@ -370,7 +370,6 @@ class CommandTestCase(IntegrationTestCase):
) )
self.extra_wallet_node_port += 1 self.extra_wallet_node_port += 1
await wallet_node.start(self.conductor.spv_node, seed=seed) await wallet_node.start(self.conductor.spv_node, seed=seed)
await wallet_node.ledger.on_ready.first
self.extra_wallet_nodes.append(wallet_node) self.extra_wallet_nodes.append(wallet_node)
upload_dir = os.path.join(wallet_node.data_path, 'uploads') upload_dir = os.path.join(wallet_node.data_path, 'uploads')

View file

@ -154,6 +154,7 @@ class Ledger(metaclass=LedgerRegistry):
self._tx_cache = pylru.lrucache(100000) self._tx_cache = pylru.lrucache(100000)
self._update_tasks = TaskGroup() self._update_tasks = TaskGroup()
self._other_tasks = TaskGroup() # that we dont need to start
self._utxo_reservation_lock = asyncio.Lock() self._utxo_reservation_lock = asyncio.Lock()
self._header_processing_lock = asyncio.Lock() self._header_processing_lock = asyncio.Lock()
self._address_update_locks: DefaultDict[str, asyncio.Lock] = defaultdict(asyncio.Lock) self._address_update_locks: DefaultDict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
@ -323,6 +324,7 @@ class Ledger(metaclass=LedgerRegistry):
else: else:
await self._report_state() await self._report_state()
self.on_transaction.listen(self._reset_balance_cache) self.on_transaction.listen(self._reset_balance_cache)
await self.on_ready.first
async def join_network(self, *_): async def join_network(self, *_):
log.info("Subscribing and updating accounts.") log.info("Subscribing and updating accounts.")
@ -334,7 +336,9 @@ class Ledger(metaclass=LedgerRegistry):
async def stop(self): async def stop(self):
self._update_tasks.cancel() self._update_tasks.cancel()
self._other_tasks.cancel()
await self._update_tasks.done.wait() await self._update_tasks.done.wait()
await self._other_tasks.done.wait()
await self.network.stop() await self.network.stop()
await self.db.close() await self.db.close()
await self.headers.close() await self.headers.close()
@ -351,7 +355,7 @@ class Ledger(metaclass=LedgerRegistry):
async with self._header_processing_lock: async with self._header_processing_lock:
for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)): for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)):
await self.headers.ensure_chunk_at(height) await self.headers.ensure_chunk_at(height)
self._update_tasks.add(doit()) self._other_tasks.add(doit())
await self.update_headers() await self.update_headers()
async def update_headers(self, height=None, headers=None, subscription_update=False): async def update_headers(self, height=None, headers=None, subscription_update=False):

View file

@ -77,7 +77,6 @@ class Conductor:
async def start_wallet(self): async def start_wallet(self):
if not self.wallet_started: if not self.wallet_started:
await self.wallet_node.start(self.spv_node) await self.wallet_node.start(self.spv_node)
await self.wallet_node.ledger.on_ready.first
self.wallet_started = True self.wallet_started = True
async def stop_wallet(self): async def stop_wallet(self):

View file

@ -87,7 +87,6 @@ class ResolveCommand(BaseResolveTestCase):
await self.conductor.spv_node.stop() await self.conductor.spv_node.stop()
resolve_task = asyncio.create_task(self.resolve('foo')) resolve_task = asyncio.create_task(self.resolve('foo'))
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.blockchain_node)
await self.ledger.on_ready.first
self.assertIsNotNone((await resolve_task)['claim_id']) self.assertIsNotNone((await resolve_task)['claim_id'])
async def test_winning_by_effective_amount(self): async def test_winning_by_effective_amount(self):

View file

@ -27,7 +27,6 @@ class SyncTests(IntegrationTestCase):
wallet_node = WalletNode(WalletManager, RegTestLedger, port=self.api_port) wallet_node = WalletNode(WalletManager, RegTestLedger, port=self.api_port)
await wallet_node.start(self.conductor.spv_node, seed) await wallet_node.start(self.conductor.spv_node, seed)
self.started_nodes.append(wallet_node) self.started_nodes.append(wallet_node)
await wallet_node.ledger.on_ready.first
return wallet_node return wallet_node
async def test_nodes_with_same_account_stay_in_sync(self): async def test_nodes_with_same_account_stay_in_sync(self):