diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index b9487aeba..d30d018e7 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -513,7 +513,6 @@ class Ledger(metaclass=LedgerRegistry): return True acquire_lock_tasks = [] - synced_txs = [] to_request = {} pending_synced_history = {} updated_cached_items = {} @@ -531,9 +530,16 @@ class Ledger(metaclass=LedgerRegistry): cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item + unsynced_offset = already_synced_offset for txid, remote_height in remote_history[already_synced_offset:]: cache_item = self._tx_cache[txid] - acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) + if cache_item.tx is not None and cache_item.tx.height >= remote_height \ + and (not cache_item.tx.is_verified or remote_height < 1): + pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:' + already_synced.add((txid, cache_item.tx.height)) + else: + acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) + unsynced_offset += 1 if acquire_lock_tasks: await asyncio.wait(acquire_lock_tasks) @@ -550,23 +556,16 @@ class Ledger(metaclass=LedgerRegistry): assert cache_item is not None, 'cache item is none' assert cache_item.lock.locked(), 'cache lock is not held?' - # tx = cache_item.tx - # if cache_item.tx is not None and \ - # cache_item.tx.height >= remote_height and \ - # (cache_item.tx.is_verified or remote_height < 1): - # synced_txs.append(cache_item.tx) # cached tx is already up-to-date - # pending_synced_history[i] = f'{tx.id}:{tx.height}:' - # continue + to_request[i] = (txid, remote_height) log.debug( - "request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), - len(remote_history), address + "request %i transactions, %i/%i for %s are already synced", len(to_request), + len(pending_synced_history), len(remote_history), address ) requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" - synced_txs.append(tx) assert len(pending_synced_history) == len(remote_history), \ f"{len(pending_synced_history)} vs {len(remote_history)}" diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index fd7477c6b..0a996d68e 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -651,7 +651,7 @@ class SessionManager: ] if header_tasks: self.logger.info(f'notify {len(header_tasks)} sessions of new header') - await asyncio.wait(header_tasks) + asyncio.create_task(asyncio.wait(header_tasks)) for hashX in touched.intersection(self.mempool_statuses.keys()): self.mempool_statuses.pop(hashX, None)