diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 44a7bdb59..60ad645e7 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -125,6 +125,7 @@ class Ledger(metaclass=LedgerRegistry): self.network.on_status.listen(self.process_status_update) self.accounts = [] + self.pending = 0 self.fee_per_byte: int = self.config.get('fee_per_byte', self.default_fee_per_byte) self._on_transaction_controller = StreamController() @@ -513,7 +514,6 @@ class Ledger(metaclass=LedgerRegistry): ) return True - acquire_lock_tasks = [] synced_txs = [] to_request = {} pending_synced_history = {} @@ -532,13 +532,6 @@ class Ledger(metaclass=LedgerRegistry): cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - for txid, remote_height in remote_history[already_synced_offset:]: - cache_item = self._tx_cache[txid] - acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) - - if acquire_lock_tasks: - await asyncio.wait(acquire_lock_tasks) - tx_indexes = {} for i, (txid, remote_height) in enumerate(remote_history): @@ -550,7 +543,6 @@ class Ledger(metaclass=LedgerRegistry): updated_cached_items[txid] = cache_item assert cache_item is not None, 'cache item is none' - assert cache_item.lock.locked(), 'cache lock is not held?' # tx = cache_item.tx # if cache_item.tx is not None and \ # cache_item.tx.height >= remote_height and \ @@ -564,7 +556,8 @@ class Ledger(metaclass=LedgerRegistry): "request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address ) - requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) + remote_history_txids = set(txid for txid, _ in remote_history) + requested_txes = await self._request_transaction_batch(to_request, remote_history_txids, address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) @@ -585,14 +578,14 @@ class Ledger(metaclass=LedgerRegistry): if cache_item.pending_verifications < 0: log.warning("config value tx cache size %i needs to be increased", cache_size) cache_item.pending_verifications = 0 - try: - cache_item.lock.release() - except RuntimeError: - log.warning("lock was already released?") await self.db.save_transaction_io_batch( - [], address, self.address_to_hash160(address), synced_history + requested_txes, address, self.address_to_hash160(address), synced_history ) + await asyncio.wait([ + self._on_transaction_controller.add(TransactionEvent(address, tx)) + for tx in requested_txes + ]) if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -742,17 +735,24 @@ class Ledger(metaclass=LedgerRegistry): await _single_batch(batch) return transactions - async def _request_transaction_batch(self, to_request, remote_history_size, address): + async def _request_transaction_batch(self, to_request, remote_history, address): header_cache = {} batches = [[]] remote_heights = {} synced_txs = [] + pending_sync = [] heights_in_batch = 0 last_height = 0 for idx in sorted(to_request): txid = to_request[idx][0] height = to_request[idx][1] remote_heights[txid] = height + if txid not in self._tx_cache: + self._tx_cache[txid] = TransactionCacheItem() + elif self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + log.warning("has: %s", txid) + pending_sync.append(self._tx_cache[txid].tx) + continue if height != last_height: heights_in_batch += 1 last_height = height @@ -766,18 +766,13 @@ class Ledger(metaclass=LedgerRegistry): last_showed_synced_count = 0 async def _single_batch(batch): - this_batch_synced = [] batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) for txid, (raw, merkle) in batch_result.items(): + log.warning("arrived batch %s", txid) remote_height = remote_heights[txid] merkle_height = merkle['block_height'] - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx = Transaction(unhexlify(raw), height=remote_height) tx.height = remote_height - cache_item.tx = tx if 'merkle' in merkle and remote_heights[txid] > 0: merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) try: @@ -788,52 +783,57 @@ class Ledger(metaclass=LedgerRegistry): header_cache[remote_heights[txid]] = header tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - check_db_for_txos = [] + self._tx_cache[txid].tx = tx + pending_sync.append(tx) - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) - if cache_item is not None: - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref - else: - check_db_for_txos.append(txi.txo_ref.id) + async def __sync(tx): + check_db_for_txos = [] + log.warning("%s", tx.id) + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + if txi.txo_ref.tx_ref.id in self._tx_cache: + continue + else: + check_db_for_txos.append(txi.txo_ref.id) - referenced_txos = {} if not check_db_for_txos else { - txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True - ) - } + referenced_txos = {} if not check_db_for_txos else { + txo.id: txo for txo in await self.db.get_txos( + txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True + ) + } - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref - continue - cache_item = self._tx_cache.get(txi.txo_ref.id) - if cache_item is None: - cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem() - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.ref + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + referenced_txo = referenced_txos.get(txi.txo_ref.id) + if referenced_txo is not None: + txi.txo_ref = referenced_txo.ref + continue + wanted_txid = txi.txo_ref.tx_ref.id + if wanted_txid in self._tx_cache: + log.warning("waiting on %s", wanted_txid) + self.pending += 1 + log.warning("total pending %s", self.pending) + await self._tx_cache[wanted_txid].has_tx.wait() + log.warning("got %s", wanted_txid) + self.pending -= 1 + log.warning("total pending %s", self.pending) + txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref - synced_txs.append(tx) - this_batch_synced.append(tx) - await self.db.save_transaction_io_batch( - this_batch_synced, address, self.address_to_hash160(address), "" - ) - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in this_batch_synced - ]) + synced_txs.append(tx) nonlocal last_showed_synced_count if last_showed_synced_count + 100 < len(synced_txs): - log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) + log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address) last_showed_synced_count = len(synced_txs) + for batch in batches: await _single_batch(batch) + await asyncio.gather(*(__sync(tx) for tx in pending_sync)) return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: