diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index b2dd8e59a..111bee067 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -556,10 +556,14 @@ class Ledger(metaclass=LedgerRegistry): len(remote_history), address ) remote_history_txids = set(txid for txid, _ in remote_history) - requested_txes = await self.request_synced_transactions(to_request, remote_history_txids, address) - for tx in requested_txes: + async for tx in self.request_synced_transactions(to_request, remote_history_txids, address): pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) + if len(synced_txs) >= 100: + await self._save_sync_batch(synced_txs, address) + log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request)) + await self._save_sync_batch(synced_txs, address) + log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request)) assert len(pending_synced_history) == len(remote_history), \ f"{len(pending_synced_history)} vs {len(remote_history)}" @@ -570,6 +574,7 @@ class Ledger(metaclass=LedgerRegistry): if f"{txid}:{height}:" != pending_synced_history[i]: log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i]) synced_history += pending_synced_history[i] + await self.db.set_address_history(address, synced_history) cache_size = self.config.get("tx_cache_size", 10_000) for txid, cache_item in updated_cached_items.items(): @@ -578,13 +583,6 @@ class Ledger(metaclass=LedgerRegistry): log.warning("config value tx cache size %i needs to be increased", cache_size) cache_item.pending_verifications = 0 - await self.db.save_transaction_io_batch( - requested_txes, address, self.address_to_hash160(address), synced_history - ) - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in requested_txes - ]) if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -647,88 +645,68 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): - header_cache = {} + async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...]): batches = [[]] remote_heights = {} - transactions = [] - heights_in_batch = 0 - last_height = 0 for txid, height in sorted(to_request, key=lambda x: x[1]): remote_heights[txid] = height - if height != last_height: - heights_in_batch += 1 - last_height = height - if len(batches[-1]) == 100 or heights_in_batch == 20: + if len(batches[-1]) == 100: batches.append([]) - heights_in_batch = 1 batches[-1].append(txid) if not batches[-1]: batches.pop() for batch in batches: - transactions.extend(await self._single_batch(batch, remote_heights, header_cache)) - return transactions + async for tx in self._single_batch(batch, remote_heights): + yield tx async def request_synced_transactions(self, to_request, remote_history, address): - pending_sync = await self.request_transactions(((txid, height) for txid, height in to_request.values())) - await asyncio.gather(*(self._sync(tx, remote_history) for tx in pending_sync)) - return pending_sync + async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): + await self._sync(tx, remote_history) + yield tx - async def _single_batch(self, batch, remote_heights, header_cache): - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) - transactions = [] + async def _save_sync_batch(self, synced_txs, address): + await self.db.save_transaction_io_batch( + synced_txs, address, self.address_to_hash160(address), "" + ) + while synced_txs: + self._on_transaction_controller.add(TransactionEvent(address, synced_txs.pop())) + + async def _single_batch(self, batch, remote_heights): + restricted = max(remote_heights.values()) > (self.network.remote_height - 10) + restricted = restricted or min(remote_heights.values()) < 0 + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, restricted) for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] tx = Transaction(unhexlify(raw), height=remote_height) - tx.height = remote_height - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - self._tx_cache[txid].tx = tx - transactions.append(tx) - return transactions + await self.maybe_verify_transaction(tx, remote_height, merkle) + yield tx async def _sync(self, tx, remote_history): - check_db_for_txos = [] + check_db_for_txos = {} for txi in tx.inputs: if txi.txo_ref.txo is not None: continue - if txi.txo_ref.tx_ref.id not in remote_history: - continue - if txi.txo_ref.tx_ref.id in self._tx_cache: + wanted_txid = txi.txo_ref.tx_ref.id + if wanted_txid not in remote_history: continue + if wanted_txid in self._tx_cache: + txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref else: - check_db_for_txos.append(txi.txo_ref.id) + check_db_for_txos[txi] = txi.txo_ref.id referenced_txos = {} if not check_db_for_txos else { txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True + txoid__in=check_db_for_txos.values(), order_by='txo.txoid', no_tx=True ) } - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - if txi.txo_ref.tx_ref.id not in remote_history: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref - continue - wanted_txid = txi.txo_ref.tx_ref.id - if wanted_txid in self._tx_cache: - await self._tx_cache[wanted_txid].has_tx.wait() - txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref + for txi in check_db_for_txos: + if txi.txo_ref.id in referenced_txos: + txi.txo_ref = referenced_txos[txi.txo_ref.id].ref + else: + log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) return tx async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: @@ -802,7 +780,8 @@ class Ledger(metaclass=LedgerRegistry): if len(outputs.txs) > 0: txs: List[Transaction] = [] if session_override: - txs.extend((await self.request_transactions(tuple(outputs.txs), session_override))) + async for tx in self.request_transactions(tuple(outputs.txs), session_override): + txs.append(tx) else: txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs))))