From c73f64c42405a1675123bf99f56d36f917968eff Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Dec 2020 15:33:06 -0300 Subject: [PATCH] unrestricted batches --- lbry/wallet/ledger.py | 17 ++++++++++++----- tests/unit/wallet/test_ledger.py | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 8d0dfd251..b7932ad19 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -661,24 +661,31 @@ class Ledger(metaclass=LedgerRegistry): if not batches[-1]: batches.pop() - for batch in batches: - async for tx in self._single_batch(batch, remote_heights): + for routine in asyncio.as_completed([self._single_batch(batch, remote_heights) for batch in batches]): + for tx in await routine: yield tx async def request_synced_transactions(self, to_request, remote_history, address): pending_sync = [] async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): pending_sync.append(asyncio.ensure_future(self._sync(tx, remote_history))) + log.info("%d/%d", len(pending_sync), len(to_request)) yield tx await asyncio.gather(*pending_sync) async def _single_batch(self, batch, remote_heights): - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + heights = {remote_heights[txid] for txid in batch} + unrestriced = 0 < min(heights) < max(heights) < max(self.headers.checkpoints or [0]) + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, not unrestriced) + txs = [] + pending_verification = [] for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] tx = Transaction(unhexlify(raw), height=remote_height) - await self.maybe_verify_transaction(tx, remote_height, merkle) - yield tx + pending_verification.append(self.maybe_verify_transaction(tx, remote_height, merkle)) + txs.append(tx) + await asyncio.gather(*pending_verification) + return txs async def _sync(self, tx, remote_history): check_db_for_txos = {} diff --git a/tests/unit/wallet/test_ledger.py b/tests/unit/wallet/test_ledger.py index e1822e2b6..c8385c837 100644 --- a/tests/unit/wallet/test_ledger.py +++ b/tests/unit/wallet/test_ledger.py @@ -40,7 +40,7 @@ class MockNetwork: merkle = await self.get_merkle(tx_hash, known_height) return tx, merkle - async def get_transaction_batch(self, txids): + async def get_transaction_batch(self, txids, restricted): return { txid: await self.get_transaction_and_merkle(txid) for txid in txids