diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 550f5452f..89b95686b 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -513,7 +513,7 @@ class Ledger(metaclass=LedgerRegistry): ) return True - acquire_lock_tasks = [] + synced_txs = [] to_request = {} pending_synced_history = {} updated_cached_items = {} @@ -531,20 +531,6 @@ class Ledger(metaclass=LedgerRegistry): cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - unsynced_offset = already_synced_offset - for txid, remote_height in remote_history[already_synced_offset:]: - cache_item = self._tx_cache[txid] - if cache_item.tx is not None and cache_item.tx.height >= remote_height \ - and (not cache_item.tx.is_verified or remote_height < 1): - pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:' - already_synced.add((txid, cache_item.tx.height)) - else: - acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) - unsynced_offset += 1 - - if acquire_lock_tasks: - await asyncio.wait(acquire_lock_tasks) - tx_indexes = {} for i, (txid, remote_height) in enumerate(remote_history): @@ -556,17 +542,24 @@ class Ledger(metaclass=LedgerRegistry): updated_cached_items[txid] = cache_item assert cache_item is not None, 'cache item is none' - assert cache_item.lock.locked(), 'cache lock is not held?' - + # tx = cache_item.tx + # if cache_item.tx is not None and \ + # cache_item.tx.height >= remote_height and \ + # (cache_item.tx.is_verified or remote_height < 1): + # synced_txs.append(cache_item.tx) # cached tx is already up-to-date + # pending_synced_history[i] = f'{tx.id}:{tx.height}:' + # continue to_request[i] = (txid, remote_height) log.debug( "request %i transactions, %i/%i for %s are already synced", len(to_request), len(pending_synced_history), len(remote_history), address ) - requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) + remote_history_txids = set(txid for txid, _ in remote_history) + requested_txes = await self._request_transaction_batch(to_request, remote_history_txids, address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" + synced_txs.append(tx) assert len(pending_synced_history) == len(remote_history), \ f"{len(pending_synced_history)} vs {len(remote_history)}" @@ -584,14 +577,14 @@ class Ledger(metaclass=LedgerRegistry): if cache_item.pending_verifications < 0: log.warning("config value tx cache size %i needs to be increased", cache_size) cache_item.pending_verifications = 0 - try: - cache_item.lock.release() - except RuntimeError: - log.warning("lock was already released?") await self.db.save_transaction_io_batch( - [], address, self.address_to_hash160(address), synced_history + requested_txes, address, self.address_to_hash160(address), synced_history ) + await asyncio.wait([ + self._on_transaction_controller.add(TransactionEvent(address, tx)) + for tx in requested_txes + ]) if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -654,35 +647,7 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def _single_batch(self, batch, remote_heights, header_cache, transactions): - batch_result = await self.network.retriable_call( - self.network.get_transaction_batch, batch - ) - for txid, (raw, merkle) in batch_result.items(): - remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(bytes.fromhex(raw.decode() if isinstance(raw, bytes) else raw), - height=remote_height) - tx.height = remote_height - cache_item.tx = tx - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - transactions.append(tx) - return transactions - - async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]): + async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): header_cache = {} batches = [[]] remote_heights = {} @@ -702,21 +667,57 @@ class Ledger(metaclass=LedgerRegistry): if not batches[-1]: batches.pop() + async def _single_batch(batch): + if session_override: + batch_result = await self.network.get_transaction_batch( + batch, restricted=False, session=session_override + ) + else: + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + for txid, (raw, merkle) in batch_result.items(): + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + cache_item = self._tx_cache.get(txid) + if cache_item is None: + cache_item = TransactionCacheItem() + self._tx_cache[txid] = cache_item + tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx.height = remote_height + cache_item.tx = tx + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) + else: + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + transactions.append(tx) + for batch in batches: - await self._single_batch(batch, remote_heights, header_cache, transactions) + await _single_batch(batch) return transactions - async def _request_transaction_batch(self, to_request, remote_history_size, address): + async def _request_transaction_batch(self, to_request, remote_history, address): header_cache = {} batches = [[]] remote_heights = {} synced_txs = [] + pending_sync = [] heights_in_batch = 0 last_height = 0 for idx in sorted(to_request): txid = to_request[idx][0] height = to_request[idx][1] remote_heights[txid] = height + if txid not in self._tx_cache: + self._tx_cache[txid] = TransactionCacheItem() + elif self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + log.warning("has: %s", txid) + pending_sync.append(self._tx_cache[txid].tx) + continue if height != last_height: heights_in_batch += 1 last_height = height @@ -730,59 +731,73 @@ class Ledger(metaclass=LedgerRegistry): last_showed_synced_count = 0 async def _single_batch(batch): - transactions = await self._single_batch(batch, remote_heights, header_cache, []) - this_batch_synced = [] - - for tx in transactions: - check_db_for_txos = [] - - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) - if cache_item is not None: - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + for txid, (raw, merkle) in batch_result.items(): + log.warning("arrived batch %s", txid) + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + tx = Transaction(unhexlify(raw), height=remote_height) + tx.height = remote_height + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) else: - check_db_for_txos.append(txi.txo_ref.id) + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + self._tx_cache[txid].tx = tx + pending_sync.append(tx) - referenced_txos = {} if not check_db_for_txos else { - txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True - ) - } + async def __sync(tx): + check_db_for_txos = [] + log.warning("%s", tx.id) + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + if txi.txo_ref.tx_ref.id in self._tx_cache: + continue + else: + check_db_for_txos.append(txi.txo_ref.id) - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref - continue - cache_item = self._tx_cache.get(txi.txo_ref.id) - if cache_item is None: - cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem() - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.ref + referenced_txos = {} if not check_db_for_txos else { + txo.id: txo for txo in await self.db.get_txos( + txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True + ) + } - synced_txs.append(tx) - this_batch_synced.append(tx) + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + referenced_txo = referenced_txos.get(txi.txo_ref.id) + if referenced_txo is not None: + txi.txo_ref = referenced_txo.ref + continue + wanted_txid = txi.txo_ref.tx_ref.id + if wanted_txid in self._tx_cache: + log.warning("waiting on %s", wanted_txid) + self.pending += 1 + log.warning("total pending %s", self.pending) + await self._tx_cache[wanted_txid].has_tx.wait() + log.warning("got %s", wanted_txid) + self.pending -= 1 + log.warning("total pending %s", self.pending) + txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref - await self.db.save_transaction_io_batch( - this_batch_synced, address, self.address_to_hash160(address), "" - ) - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in this_batch_synced - ]) + synced_txs.append(tx) nonlocal last_showed_synced_count if last_showed_synced_count + 100 < len(synced_txs): - log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) + log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address) last_showed_synced_count = len(synced_txs) - for batch in batches: await _single_batch(batch) - + await asyncio.gather(*(__sync(tx) for tx in pending_sync)) return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: @@ -848,13 +863,17 @@ class Ledger(metaclass=LedgerRegistry): include_is_my_output=False, include_sent_supports=False, include_sent_tips=False, - include_received_tips=False) -> Tuple[List[Output], dict, int, int]: + include_received_tips=False, + session_override=None) -> Tuple[List[Output], dict, int, int]: encoded_outputs = await query outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? txs = [] if len(outputs.txs) > 0: txs: List[Transaction] = [] - txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs)))) + if session_override: + txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs), session_override))) + else: + txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)))) _txos, blocked = outputs.inflate(txs) @@ -929,17 +948,25 @@ class Ledger(metaclass=LedgerRegistry): async def resolve(self, accounts, urls, new_sdk_server=None, **kwargs): txos = [] urls_copy = list(urls) + if new_sdk_server: resolve = partial(self.network.new_resolve, new_sdk_server) + while urls_copy: + batch, urls_copy = urls_copy[:500], urls_copy[500:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, **kwargs + ))[0] + ) else: - resolve = partial(self.network.retriable_call, self.network.resolve) - while urls_copy: - batch, urls_copy = urls_copy[:100], urls_copy[100:] - txos.extend( - (await self._inflate_outputs( - resolve(batch), accounts, **kwargs - ))[0] - ) + async with self.network.single_call_context(self.network.resolve) as (resolve, session): + while urls_copy: + batch, urls_copy = urls_copy[:500], urls_copy[500:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, session_override=session, **kwargs + ))[0] + ) assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received." result = {} @@ -961,13 +988,17 @@ class Ledger(metaclass=LedgerRegistry): new_sdk_server=None, **kwargs) -> Tuple[List[Output], dict, int, int]: if new_sdk_server: claim_search = partial(self.network.new_claim_search, new_sdk_server) - else: - claim_search = self.network.claim_search - return await self._inflate_outputs( - claim_search(**kwargs), accounts, - include_purchase_receipt=include_purchase_receipt, - include_is_my_output=include_is_my_output, - ) + return await self._inflate_outputs( + claim_search(**kwargs), accounts, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) + async with self.network.single_call_context(self.network.claim_search) as (claim_search, session): + return await self._inflate_outputs( + claim_search(**kwargs), accounts, session_override=session, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: for claim in (await self.claim_search(accounts, claim_id=claim_id, **kwargs))[0]: