diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index d03862dd4..3ab62402d 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -654,6 +654,33 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] + async def _single_batch(self, batch, remote_heights, header_cache, transactions): + batch_result = await self.network.retriable_call( + self.network.get_transaction_batch, batch + ) + for txid, (raw, merkle) in batch_result.items(): + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + cache_item = self._tx_cache.get(txid) + if cache_item is None: + cache_item = TransactionCacheItem() + self._tx_cache[txid] = cache_item + tx = cache_item.tx or Transaction(bytes.fromhex(raw.decode()), height=remote_height) + tx.height = remote_height + cache_item.tx = tx + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) + else: + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + transactions.append(tx) + return transactions + async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]): header_cache = {} batches = [[]] @@ -674,34 +701,8 @@ class Ledger(metaclass=LedgerRegistry): if not batches[-1]: batches.pop() - async def _single_batch(batch): - batch_result = await self.network.retriable_call( - self.network.get_transaction_batch, batch, restricted=False - ) - for txid, (raw, merkle) in batch_result.items(): - remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(bytes.fromhex(raw), height=remote_height) - tx.height = remote_height - cache_item.tx = tx - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - transactions.append(tx) - for batch in batches: - await _single_batch(batch) + await self._single_batch(batch, remote_heights, header_cache, transactions) return transactions async def _request_transaction_batch(self, to_request, remote_history_size, address): @@ -728,28 +729,10 @@ class Ledger(metaclass=LedgerRegistry): last_showed_synced_count = 0 async def _single_batch(batch): + transactions = await self._single_batch(batch, remote_heights, header_cache, []) this_batch_synced = [] - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) - for txid, (raw, merkle) in batch_result.items(): - remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) - tx.height = remote_height - cache_item.tx = tx - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] + + for tx in transactions: check_db_for_txos = [] for txi in tx.inputs: @@ -783,6 +766,7 @@ class Ledger(metaclass=LedgerRegistry): synced_txs.append(tx) this_batch_synced.append(tx) + await self.db.save_transaction_io_batch( this_batch_synced, address, self.address_to_hash160(address), "" ) @@ -794,8 +778,10 @@ class Ledger(metaclass=LedgerRegistry): if last_showed_synced_count + 100 < len(synced_txs): log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) last_showed_synced_count = len(synced_txs) + for batch in batches: await _single_batch(batch) + return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: