From 9d5370be5f544df4ccbe9d7789e5be829028166b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Jul 2020 13:34:08 -0400 Subject: [PATCH] fix --- lbry/wallet/ledger.py | 49 +++++++++++++++++++------------- tests/unit/wallet/test_ledger.py | 13 +++++++-- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index ffa0c9d86..a303427bd 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -502,6 +502,12 @@ class Ledger(metaclass=LedgerRegistry): remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) we_need = set(remote_history) - set(local_history) if not we_need: + remote_missing = set(local_history) - set(remote_history) + if remote_missing: + log.warning( + "%i transactions we have for %s are not in the remote address history", + len(remote_missing), address + ) return True acquire_lock_tasks = [] @@ -511,18 +517,20 @@ class Ledger(metaclass=LedgerRegistry): updated_cached_items = {} already_synced = set() + already_synced_offset = 0 for i, (txid, remote_height) in enumerate(remote_history): - if not acquire_lock_tasks and i < len(local_history) and local_history[i] == (txid, remote_height): + if i == already_synced_offset and i < len(local_history) and local_history[i] == (txid, remote_height): pending_synced_history[i] = f'{txid}:{remote_height}:' already_synced.add((txid, remote_height)) + already_synced_offset += 1 continue cache_item = self._tx_cache.get(txid) if cache_item is None: cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - if len(acquire_lock_tasks) > 10000: - await asyncio.wait(acquire_lock_tasks) - acquire_lock_tasks.clear() + + for txid, remote_height in remote_history[already_synced_offset:]: + cache_item = self._tx_cache[txid] acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) if acquire_lock_tasks: @@ -576,11 +584,9 @@ class Ledger(metaclass=LedgerRegistry): log.warning("lock was already released?") pass - log.info("updating address count and status") await self.db.save_transaction_io_batch( [], address, self.address_to_hash160(address), synced_history ) - log.info("updated address count and status") if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -623,7 +629,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync.add(address) return False else: - log.info("synced %s", address) + log.info("finished syncing transaction history for %s, %i known txs", address, len(local_history)) return True async def cache_transaction(self, txid, remote_height, check_local=True): @@ -682,14 +688,16 @@ class Ledger(metaclass=LedgerRegistry): batches = [[]] remote_heights = {} synced_txs = [] - heights_in_batch = 1 + heights_in_batch = 0 + last_height = 0 for idx in sorted(to_request): txid = to_request[idx][0] height = to_request[idx][1] remote_heights[txid] = height - if idx > 1 and height != remote_heights[batches[-1][-1]]: + if height != last_height: heights_in_batch += 1 - if len(batches[-1]) == 100 or heights_in_batch == 10: + last_height = height + if len(batches[-1]) == 100 or heights_in_batch == 20: batches.append([]) heights_in_batch = 1 batches[-1].append(txid) @@ -728,10 +736,8 @@ class Ledger(metaclass=LedgerRegistry): continue cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) if cache_item is not None: - if cache_item.tx is None: - await cache_item.has_tx.wait() - assert cache_item.tx is not None - txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref + if cache_item.tx is not None: + txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref else: check_db_for_txos.append(txi.txo_ref.id) @@ -740,12 +746,20 @@ class Ledger(metaclass=LedgerRegistry): txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True ) } + for txi in tx.inputs: if txi.txo_ref.txo is not None: continue referenced_txo = referenced_txos.get(txi.txo_ref.id) if referenced_txo is not None: txi.txo_ref = referenced_txo.ref + continue + cache_item = self._tx_cache.get(txi.txo_ref.id) + if cache_item is None: + cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem() + if cache_item.tx is not None: + txi.txo_ref = cache_item.tx.ref + synced_txs.append(tx) this_batch_synced.append(tx) await self.db.save_transaction_io_batch( @@ -759,11 +773,8 @@ class Ledger(metaclass=LedgerRegistry): if last_showed_synced_count + 100 < len(synced_txs): log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) last_showed_synced_count = len(synced_txs) - - await asyncio.wait( - [_single_batch(batch) for batch in batches] - ) - log.info("finished syncing history for %s", address) + for batch in batches: + await _single_batch(batch) return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: diff --git a/tests/unit/wallet/test_ledger.py b/tests/unit/wallet/test_ledger.py index bfe5cc71b..e1822e2b6 100644 --- a/tests/unit/wallet/test_ledger.py +++ b/tests/unit/wallet/test_ledger.py @@ -35,11 +35,17 @@ class MockNetwork: async def get_transaction_and_merkle(self, tx_hash, known_height=None): tx = await self.get_transaction(tx_hash) - merkle = {} + merkle = {'block_height': -1} if known_height: merkle = await self.get_merkle(tx_hash, known_height) return tx, merkle + async def get_transaction_batch(self, txids): + return { + txid: await self.get_transaction_and_merkle(txid) + for txid in txids + } + class LedgerTestCase(AsyncioTestCase): @@ -120,8 +126,9 @@ class TestSynchronization(LedgerTestCase): self.ledger.network.get_history_called = [] self.ledger.network.get_transaction_called = [] - for cache_item in self.ledger._tx_cache.values(): - cache_item.tx.is_verified = True + self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified) + self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified) + self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified) await self.ledger.update_history(address, '') self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_transaction_called, [])