From 270da80d64d78e0e8797e63a35f4a9e2e48947a9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 23 Dec 2020 19:10:19 -0300 Subject: [PATCH] return whole batches --- lbry/wallet/ledger.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index c5dc64494..a2b634bdf 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -605,6 +605,7 @@ class Ledger(metaclass=LedgerRegistry): header = await self.headers.get(remote_height) tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] + return tx async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], cached=False): batches = [[]] @@ -614,7 +615,8 @@ class Ledger(metaclass=LedgerRegistry): if cached: if txid in self._tx_cache: if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: - yield self._tx_cache[txid].tx + tx = self._tx_cache[txid].tx + yield {tx.id: self._tx_cache[txid].tx} continue else: self._tx_cache[txid] = TransactionCacheItem() @@ -626,29 +628,29 @@ class Ledger(metaclass=LedgerRegistry): batches.pop() for batch in batches: - async for tx in self._single_batch(batch, remote_heights): - if cached: - self._tx_cache[tx.id].tx = tx - yield tx + txs = await self._single_batch(batch, remote_heights) + if cached: + for txid, tx in txs.items(): + self._tx_cache[txid].tx = tx + yield txs async def request_synced_transactions(self, to_request, remote_history, address): - pending_sync = {} - async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): - pending_sync[tx.id] = tx - yield tx - if len(pending_sync) > 10: - await self._sync_and_save_batch(address, remote_history, pending_sync) - await self._sync_and_save_batch(address, remote_history, pending_sync) + async for txs in self.request_transactions(((txid, height) for txid, height in to_request.values())): + for tx in txs.values(): + yield tx + await self._sync_and_save_batch(address, remote_history, txs) async def _single_batch(self, batch, remote_heights): heights = {remote_heights[txid] for txid in batch} unrestriced = 0 < min(heights) < max(heights) < max(self.headers.checkpoints or [0]) batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, not unrestriced) + txs = {} for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] tx = Transaction(unhexlify(raw), height=remote_height) + txs[tx.id] = tx await self.maybe_verify_transaction(tx, remote_height, merkle) - yield tx + return txs async def _sync_and_save_batch(self, address, remote_history, pending_txs): await asyncio.gather(*(self._sync(tx, remote_history, pending_txs) for tx in pending_txs.values())) @@ -757,7 +759,7 @@ class Ledger(metaclass=LedgerRegistry): txs: List[Transaction] = [] if len(outputs.txs) > 0: async for tx in self.request_transactions(tuple(outputs.txs), cached=True): - txs.append(tx) + txs.extend(tx.values()) _txos, blocked = outputs.inflate(txs)