diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index d2e09e546..fc9bb5b82 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -606,12 +606,6 @@ class Ledger(metaclass=LedgerRegistry): async def maybe_verify_transaction(self, tx, remote_height, merkle=None): tx.height = remote_height - cached = self._tx_cache.get(tx.id) - if not cached: - # cache txs looked up by transaction_show too - cached = TransactionCacheItem() - self._tx_cache[tx.id] = cached - cached.tx = tx if 0 < remote_height < len(self.headers): # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case if not merkle: @@ -623,17 +617,18 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...]): + async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], cached=False): batches = [[]] remote_heights = {} for txid, height in sorted(to_request, key=lambda x: x[1]): - if txid in self._tx_cache: - if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: - yield self._tx_cache[txid].tx - continue - else: - self._tx_cache[txid] = TransactionCacheItem() + if cached: + if txid in self._tx_cache: + if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + yield self._tx_cache[txid].tx + continue + else: + self._tx_cache[txid] = TransactionCacheItem() remote_heights[txid] = height if len(batches[-1]) == 100: batches.append([]) @@ -643,14 +638,16 @@ class Ledger(metaclass=LedgerRegistry): for batch in batches: async for tx in self._single_batch(batch, remote_heights): + if cached: + self._tx_cache[tx.id].tx = tx yield tx async def request_synced_transactions(self, to_request, remote_history, address): - pending_sync = [] + pending_sync = {} async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): - pending_sync.append(asyncio.ensure_future(self._sync(tx, remote_history))) + pending_sync[tx.id] = tx yield tx - await asyncio.gather(*pending_sync) + await asyncio.gather(*(self._sync(tx, remote_history, pending_sync) for tx in pending_sync.values())) async def _single_batch(self, batch, remote_heights): heights = {remote_heights[txid] for txid in batch} @@ -662,7 +659,7 @@ class Ledger(metaclass=LedgerRegistry): await self.maybe_verify_transaction(tx, remote_height, merkle) yield tx - async def _sync(self, tx, remote_history): + async def _sync(self, tx, remote_history, pending_txs): check_db_for_txos = {} for txi in tx.inputs: if txi.txo_ref.txo is not None: @@ -670,9 +667,8 @@ class Ledger(metaclass=LedgerRegistry): wanted_txid = txi.txo_ref.tx_ref.id if wanted_txid not in remote_history: continue - if wanted_txid in self._tx_cache: - await self._tx_cache[wanted_txid].has_tx.wait() - txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref + if wanted_txid in pending_txs: + tx = pending_txs[wanted_txid] else: check_db_for_txos[txi] = txi.txo_ref.id @@ -686,8 +682,11 @@ class Ledger(metaclass=LedgerRegistry): if txi.txo_ref.id in referenced_txos: txi.txo_ref = referenced_txos[txi.txo_ref.id].ref else: - log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) - return tx + tx = await self.db.get_transaction(txid=wanted_txid) + if tx is None: + log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) + else: + txi.txo_ref = tx.outputs[txi.txo_ref.position].ref async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: details = await self.db.get_address(address=address) @@ -757,7 +756,7 @@ class Ledger(metaclass=LedgerRegistry): outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? txs: List[Transaction] = [] if len(outputs.txs) > 0: - async for tx in self.request_transactions(tuple(outputs.txs)): + async for tx in self.request_transactions(tuple(outputs.txs), cached=True): txs.append(tx) _txos, blocked = outputs.inflate(txs)