diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 70b22b103..ffa0c9d86 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_transaction_controller = StreamController() self.on_transaction = self._on_transaction_controller.stream self.on_transaction.listen( - lambda e: log.info( + lambda e: log.debug( '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s', self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id ) @@ -520,9 +520,13 @@ class Ledger(metaclass=LedgerRegistry): if cache_item is None: cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item + if len(acquire_lock_tasks) > 10000: + await asyncio.wait(acquire_lock_tasks) + acquire_lock_tasks.clear() acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) - await asyncio.wait(acquire_lock_tasks) + if acquire_lock_tasks: + await asyncio.wait(acquire_lock_tasks) tx_indexes = {} @@ -546,11 +550,10 @@ class Ledger(metaclass=LedgerRegistry): to_request[i] = (txid, remote_height) log.info("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address) - requested_txes = await self._request_transaction_batch(to_request) + requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) - log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address) assert len(pending_synced_history) == len(remote_history), f"{len(pending_synced_history)} vs {len(remote_history)}\n{remote_history}\n{pending_synced_history}" synced_history = "" @@ -573,14 +576,11 @@ class Ledger(metaclass=LedgerRegistry): log.warning("lock was already released?") pass + log.info("updating address count and status") await self.db.save_transaction_io_batch( - synced_txs, address, self.address_to_hash160(address), synced_history + [], address, self.address_to_hash160(address), synced_history ) - - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in synced_txs - ]) + log.info("updated address count and status") if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -623,6 +623,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync.add(address) return False else: + log.info("synced %s", address) return True async def cache_transaction(self, txid, remote_height, check_local=True): @@ -676,23 +677,29 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def _request_transaction_batch(self, to_request): + async def _request_transaction_batch(self, to_request, remote_history_size, address): header_cache = {} batches = [[]] remote_heights = {} synced_txs = [] - + heights_in_batch = 1 for idx in sorted(to_request): txid = to_request[idx][0] height = to_request[idx][1] remote_heights[txid] = height - if len(batches[-1]) == 1: + if idx > 1 and height != remote_heights[batches[-1][-1]]: + heights_in_batch += 1 + if len(batches[-1]) == 100 or heights_in_batch == 10: batches.append([]) + heights_in_batch = 1 batches[-1].append(txid) if not len(batches[-1]): batches.pop() + last_showed_synced_count = 0 + async def _single_batch(batch): + this_batch_synced = [] batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] @@ -740,13 +747,23 @@ class Ledger(metaclass=LedgerRegistry): if referenced_txo is not None: txi.txo_ref = referenced_txo.ref synced_txs.append(tx) - - if batches: + this_batch_synced.append(tx) + await self.db.save_transaction_io_batch( + this_batch_synced, address, self.address_to_hash160(address), "" + ) await asyncio.wait([ - asyncio.create_task( - _single_batch(_batch) - ) for _batch in batches + self._on_transaction_controller.add(TransactionEvent(address, tx)) + for tx in this_batch_synced ]) + nonlocal last_showed_synced_count + if last_showed_synced_count + 100 < len(synced_txs): + log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) + last_showed_synced_count = len(synced_txs) + + await asyncio.wait( + [_single_batch(batch) for batch in batches] + ) + log.info("finished syncing history for %s", address) return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: