From 41ecb702979efbe1f7e83e88ae711ab92fbd3421 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 19 Dec 2020 17:00:01 -0300 Subject: [PATCH 01/14] join network can only happen after initial header sync returns --- lbry/wallet/ledger.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index d30d018e7..550f5452f 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -123,7 +123,6 @@ class Ledger(metaclass=LedgerRegistry): self.network: Network = self.config.get('network') or Network(self) self.network.on_header.listen(self.receive_header) self.network.on_status.listen(self.process_status_update) - self.network.on_connected.listen(self.join_network) self.accounts = [] self.fee_per_byte: int = self.config.get('fee_per_byte', self.default_fee_per_byte) @@ -329,6 +328,8 @@ class Ledger(metaclass=LedgerRegistry): await self.network.on_connected.first async with self._header_processing_lock: await self._update_tasks.add(self.initial_headers_sync()) + self.network.on_connected.listen(self.join_network) + asyncio.ensure_future(self.join_network()) await fully_synced await self.db.release_all_outputs() await asyncio.gather(*(a.maybe_migrate_certificates() for a in self.accounts)) From e8261b000ee005816202a495514a71063f3ac15a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 19 Dec 2020 23:39:01 -0300 Subject: [PATCH 02/14] wip, see jack --- lbry/wallet/ledger.py | 259 +++++++++++++++++++++++------------------- 1 file changed, 145 insertions(+), 114 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 550f5452f..89b95686b 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -513,7 +513,7 @@ class Ledger(metaclass=LedgerRegistry): ) return True - acquire_lock_tasks = [] + synced_txs = [] to_request = {} pending_synced_history = {} updated_cached_items = {} @@ -531,20 +531,6 @@ class Ledger(metaclass=LedgerRegistry): cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - unsynced_offset = already_synced_offset - for txid, remote_height in remote_history[already_synced_offset:]: - cache_item = self._tx_cache[txid] - if cache_item.tx is not None and cache_item.tx.height >= remote_height \ - and (not cache_item.tx.is_verified or remote_height < 1): - pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:' - already_synced.add((txid, cache_item.tx.height)) - else: - acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) - unsynced_offset += 1 - - if acquire_lock_tasks: - await asyncio.wait(acquire_lock_tasks) - tx_indexes = {} for i, (txid, remote_height) in enumerate(remote_history): @@ -556,17 +542,24 @@ class Ledger(metaclass=LedgerRegistry): updated_cached_items[txid] = cache_item assert cache_item is not None, 'cache item is none' - assert cache_item.lock.locked(), 'cache lock is not held?' - + # tx = cache_item.tx + # if cache_item.tx is not None and \ + # cache_item.tx.height >= remote_height and \ + # (cache_item.tx.is_verified or remote_height < 1): + # synced_txs.append(cache_item.tx) # cached tx is already up-to-date + # pending_synced_history[i] = f'{tx.id}:{tx.height}:' + # continue to_request[i] = (txid, remote_height) log.debug( "request %i transactions, %i/%i for %s are already synced", len(to_request), len(pending_synced_history), len(remote_history), address ) - requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) + remote_history_txids = set(txid for txid, _ in remote_history) + requested_txes = await self._request_transaction_batch(to_request, remote_history_txids, address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" + synced_txs.append(tx) assert len(pending_synced_history) == len(remote_history), \ f"{len(pending_synced_history)} vs {len(remote_history)}" @@ -584,14 +577,14 @@ class Ledger(metaclass=LedgerRegistry): if cache_item.pending_verifications < 0: log.warning("config value tx cache size %i needs to be increased", cache_size) cache_item.pending_verifications = 0 - try: - cache_item.lock.release() - except RuntimeError: - log.warning("lock was already released?") await self.db.save_transaction_io_batch( - [], address, self.address_to_hash160(address), synced_history + requested_txes, address, self.address_to_hash160(address), synced_history ) + await asyncio.wait([ + self._on_transaction_controller.add(TransactionEvent(address, tx)) + for tx in requested_txes + ]) if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -654,35 +647,7 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def _single_batch(self, batch, remote_heights, header_cache, transactions): - batch_result = await self.network.retriable_call( - self.network.get_transaction_batch, batch - ) - for txid, (raw, merkle) in batch_result.items(): - remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(bytes.fromhex(raw.decode() if isinstance(raw, bytes) else raw), - height=remote_height) - tx.height = remote_height - cache_item.tx = tx - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - transactions.append(tx) - return transactions - - async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]): + async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): header_cache = {} batches = [[]] remote_heights = {} @@ -702,21 +667,57 @@ class Ledger(metaclass=LedgerRegistry): if not batches[-1]: batches.pop() + async def _single_batch(batch): + if session_override: + batch_result = await self.network.get_transaction_batch( + batch, restricted=False, session=session_override + ) + else: + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + for txid, (raw, merkle) in batch_result.items(): + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + cache_item = self._tx_cache.get(txid) + if cache_item is None: + cache_item = TransactionCacheItem() + self._tx_cache[txid] = cache_item + tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx.height = remote_height + cache_item.tx = tx + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) + else: + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + transactions.append(tx) + for batch in batches: - await self._single_batch(batch, remote_heights, header_cache, transactions) + await _single_batch(batch) return transactions - async def _request_transaction_batch(self, to_request, remote_history_size, address): + async def _request_transaction_batch(self, to_request, remote_history, address): header_cache = {} batches = [[]] remote_heights = {} synced_txs = [] + pending_sync = [] heights_in_batch = 0 last_height = 0 for idx in sorted(to_request): txid = to_request[idx][0] height = to_request[idx][1] remote_heights[txid] = height + if txid not in self._tx_cache: + self._tx_cache[txid] = TransactionCacheItem() + elif self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + log.warning("has: %s", txid) + pending_sync.append(self._tx_cache[txid].tx) + continue if height != last_height: heights_in_batch += 1 last_height = height @@ -730,59 +731,73 @@ class Ledger(metaclass=LedgerRegistry): last_showed_synced_count = 0 async def _single_batch(batch): - transactions = await self._single_batch(batch, remote_heights, header_cache, []) - this_batch_synced = [] - - for tx in transactions: - check_db_for_txos = [] - - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) - if cache_item is not None: - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + for txid, (raw, merkle) in batch_result.items(): + log.warning("arrived batch %s", txid) + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + tx = Transaction(unhexlify(raw), height=remote_height) + tx.height = remote_height + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) else: - check_db_for_txos.append(txi.txo_ref.id) + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + self._tx_cache[txid].tx = tx + pending_sync.append(tx) - referenced_txos = {} if not check_db_for_txos else { - txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True - ) - } + async def __sync(tx): + check_db_for_txos = [] + log.warning("%s", tx.id) + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + if txi.txo_ref.tx_ref.id in self._tx_cache: + continue + else: + check_db_for_txos.append(txi.txo_ref.id) - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref - continue - cache_item = self._tx_cache.get(txi.txo_ref.id) - if cache_item is None: - cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem() - if cache_item.tx is not None: - txi.txo_ref = cache_item.tx.ref + referenced_txos = {} if not check_db_for_txos else { + txo.id: txo for txo in await self.db.get_txos( + txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True + ) + } - synced_txs.append(tx) - this_batch_synced.append(tx) + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + referenced_txo = referenced_txos.get(txi.txo_ref.id) + if referenced_txo is not None: + txi.txo_ref = referenced_txo.ref + continue + wanted_txid = txi.txo_ref.tx_ref.id + if wanted_txid in self._tx_cache: + log.warning("waiting on %s", wanted_txid) + self.pending += 1 + log.warning("total pending %s", self.pending) + await self._tx_cache[wanted_txid].has_tx.wait() + log.warning("got %s", wanted_txid) + self.pending -= 1 + log.warning("total pending %s", self.pending) + txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref - await self.db.save_transaction_io_batch( - this_batch_synced, address, self.address_to_hash160(address), "" - ) - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in this_batch_synced - ]) + synced_txs.append(tx) nonlocal last_showed_synced_count if last_showed_synced_count + 100 < len(synced_txs): - log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) + log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address) last_showed_synced_count = len(synced_txs) - for batch in batches: await _single_batch(batch) - + await asyncio.gather(*(__sync(tx) for tx in pending_sync)) return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: @@ -848,13 +863,17 @@ class Ledger(metaclass=LedgerRegistry): include_is_my_output=False, include_sent_supports=False, include_sent_tips=False, - include_received_tips=False) -> Tuple[List[Output], dict, int, int]: + include_received_tips=False, + session_override=None) -> Tuple[List[Output], dict, int, int]: encoded_outputs = await query outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? txs = [] if len(outputs.txs) > 0: txs: List[Transaction] = [] - txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs)))) + if session_override: + txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs), session_override))) + else: + txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)))) _txos, blocked = outputs.inflate(txs) @@ -929,17 +948,25 @@ class Ledger(metaclass=LedgerRegistry): async def resolve(self, accounts, urls, new_sdk_server=None, **kwargs): txos = [] urls_copy = list(urls) + if new_sdk_server: resolve = partial(self.network.new_resolve, new_sdk_server) + while urls_copy: + batch, urls_copy = urls_copy[:500], urls_copy[500:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, **kwargs + ))[0] + ) else: - resolve = partial(self.network.retriable_call, self.network.resolve) - while urls_copy: - batch, urls_copy = urls_copy[:100], urls_copy[100:] - txos.extend( - (await self._inflate_outputs( - resolve(batch), accounts, **kwargs - ))[0] - ) + async with self.network.single_call_context(self.network.resolve) as (resolve, session): + while urls_copy: + batch, urls_copy = urls_copy[:500], urls_copy[500:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, session_override=session, **kwargs + ))[0] + ) assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received." result = {} @@ -961,13 +988,17 @@ class Ledger(metaclass=LedgerRegistry): new_sdk_server=None, **kwargs) -> Tuple[List[Output], dict, int, int]: if new_sdk_server: claim_search = partial(self.network.new_claim_search, new_sdk_server) - else: - claim_search = self.network.claim_search - return await self._inflate_outputs( - claim_search(**kwargs), accounts, - include_purchase_receipt=include_purchase_receipt, - include_is_my_output=include_is_my_output, - ) + return await self._inflate_outputs( + claim_search(**kwargs), accounts, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) + async with self.network.single_call_context(self.network.claim_search) as (claim_search, session): + return await self._inflate_outputs( + claim_search(**kwargs), accounts, session_override=session, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: for claim in (await self.claim_search(accounts, claim_id=claim_id, **kwargs))[0]: From 751b5f3027c1d22c6250c4a83d602ffca3ade74b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 20 Dec 2020 00:50:48 -0300 Subject: [PATCH 03/14] refactor duplicate code --- lbry/wallet/ledger.py | 189 ++++++++++++++---------------------------- 1 file changed, 60 insertions(+), 129 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 89b95686b..b2dd8e59a 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -552,11 +552,11 @@ class Ledger(metaclass=LedgerRegistry): to_request[i] = (txid, remote_height) log.debug( - "request %i transactions, %i/%i for %s are already synced", len(to_request), - len(pending_synced_history), len(remote_history), address + "request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), + len(remote_history), address ) remote_history_txids = set(txid for txid, _ in remote_history) - requested_txes = await self._request_transaction_batch(to_request, remote_history_txids, address) + requested_txes = await self.request_synced_transactions(to_request, remote_history_txids, address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) @@ -647,7 +647,7 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): + async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): header_cache = {} batches = [[]] remote_heights = {} @@ -667,138 +667,69 @@ class Ledger(metaclass=LedgerRegistry): if not batches[-1]: batches.pop() - async def _single_batch(batch): - if session_override: - batch_result = await self.network.get_transaction_batch( - batch, restricted=False, session=session_override - ) - else: - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) - for txid, (raw, merkle) in batch_result.items(): - remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) - tx.height = remote_height - cache_item.tx = tx - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - transactions.append(tx) - for batch in batches: - await _single_batch(batch) + transactions.extend(await self._single_batch(batch, remote_heights, header_cache)) return transactions - async def _request_transaction_batch(self, to_request, remote_history, address): - header_cache = {} - batches = [[]] - remote_heights = {} - synced_txs = [] - pending_sync = [] - heights_in_batch = 0 - last_height = 0 - for idx in sorted(to_request): - txid = to_request[idx][0] - height = to_request[idx][1] - remote_heights[txid] = height - if txid not in self._tx_cache: - self._tx_cache[txid] = TransactionCacheItem() - elif self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: - log.warning("has: %s", txid) - pending_sync.append(self._tx_cache[txid].tx) - continue - if height != last_height: - heights_in_batch += 1 - last_height = height - if len(batches[-1]) == 100 or heights_in_batch == 20: - batches.append([]) - heights_in_batch = 1 - batches[-1].append(txid) - if not batches[-1]: - batches.pop() + async def request_synced_transactions(self, to_request, remote_history, address): + pending_sync = await self.request_transactions(((txid, height) for txid, height in to_request.values())) + await asyncio.gather(*(self._sync(tx, remote_history) for tx in pending_sync)) + return pending_sync - last_showed_synced_count = 0 - - async def _single_batch(batch): - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) - for txid, (raw, merkle) in batch_result.items(): - log.warning("arrived batch %s", txid) - remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] - tx = Transaction(unhexlify(raw), height=remote_height) - tx.height = remote_height - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - self._tx_cache[txid].tx = tx - pending_sync.append(tx) - - async def __sync(tx): - check_db_for_txos = [] - log.warning("%s", tx.id) - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - if txi.txo_ref.tx_ref.id not in remote_history: - continue - if txi.txo_ref.tx_ref.id in self._tx_cache: - continue + async def _single_batch(self, batch, remote_heights, header_cache): + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + transactions = [] + for txid, (raw, merkle) in batch_result.items(): + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + tx = Transaction(unhexlify(raw), height=remote_height) + tx.height = remote_height + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) else: - check_db_for_txos.append(txi.txo_ref.id) + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + self._tx_cache[txid].tx = tx + transactions.append(tx) + return transactions - referenced_txos = {} if not check_db_for_txos else { - txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True - ) - } + async def _sync(self, tx, remote_history): + check_db_for_txos = [] + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + if txi.txo_ref.tx_ref.id in self._tx_cache: + continue + else: + check_db_for_txos.append(txi.txo_ref.id) - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - if txi.txo_ref.tx_ref.id not in remote_history: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref - continue - wanted_txid = txi.txo_ref.tx_ref.id - if wanted_txid in self._tx_cache: - log.warning("waiting on %s", wanted_txid) - self.pending += 1 - log.warning("total pending %s", self.pending) - await self._tx_cache[wanted_txid].has_tx.wait() - log.warning("got %s", wanted_txid) - self.pending -= 1 - log.warning("total pending %s", self.pending) - txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref + referenced_txos = {} if not check_db_for_txos else { + txo.id: txo for txo in await self.db.get_txos( + txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True + ) + } - synced_txs.append(tx) - nonlocal last_showed_synced_count - if last_showed_synced_count + 100 < len(synced_txs): - log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address) - last_showed_synced_count = len(synced_txs) - for batch in batches: - await _single_batch(batch) - await asyncio.gather(*(__sync(tx) for tx in pending_sync)) - return synced_txs + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + if txi.txo_ref.tx_ref.id not in remote_history: + continue + referenced_txo = referenced_txos.get(txi.txo_ref.id) + if referenced_txo is not None: + txi.txo_ref = referenced_txo.ref + continue + wanted_txid = txi.txo_ref.tx_ref.id + if wanted_txid in self._tx_cache: + await self._tx_cache[wanted_txid].has_tx.wait() + txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref + return tx async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: details = await self.db.get_address(address=address) @@ -871,7 +802,7 @@ class Ledger(metaclass=LedgerRegistry): if len(outputs.txs) > 0: txs: List[Transaction] = [] if session_override: - txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs), session_override))) + txs.extend((await self.request_transactions(tuple(outputs.txs), session_override))) else: txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)))) From 00713c0d11aa6b447d9436ca3355fc25a062cff3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 20 Dec 2020 03:12:11 -0300 Subject: [PATCH 04/14] asyncgens --- lbry/wallet/ledger.py | 103 +++++++++++++++++------------------------- 1 file changed, 41 insertions(+), 62 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index b2dd8e59a..111bee067 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -556,10 +556,14 @@ class Ledger(metaclass=LedgerRegistry): len(remote_history), address ) remote_history_txids = set(txid for txid, _ in remote_history) - requested_txes = await self.request_synced_transactions(to_request, remote_history_txids, address) - for tx in requested_txes: + async for tx in self.request_synced_transactions(to_request, remote_history_txids, address): pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) + if len(synced_txs) >= 100: + await self._save_sync_batch(synced_txs, address) + log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request)) + await self._save_sync_batch(synced_txs, address) + log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request)) assert len(pending_synced_history) == len(remote_history), \ f"{len(pending_synced_history)} vs {len(remote_history)}" @@ -570,6 +574,7 @@ class Ledger(metaclass=LedgerRegistry): if f"{txid}:{height}:" != pending_synced_history[i]: log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i]) synced_history += pending_synced_history[i] + await self.db.set_address_history(address, synced_history) cache_size = self.config.get("tx_cache_size", 10_000) for txid, cache_item in updated_cached_items.items(): @@ -578,13 +583,6 @@ class Ledger(metaclass=LedgerRegistry): log.warning("config value tx cache size %i needs to be increased", cache_size) cache_item.pending_verifications = 0 - await self.db.save_transaction_io_batch( - requested_txes, address, self.address_to_hash160(address), synced_history - ) - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in requested_txes - ]) if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -647,88 +645,68 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): - header_cache = {} + async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...]): batches = [[]] remote_heights = {} - transactions = [] - heights_in_batch = 0 - last_height = 0 for txid, height in sorted(to_request, key=lambda x: x[1]): remote_heights[txid] = height - if height != last_height: - heights_in_batch += 1 - last_height = height - if len(batches[-1]) == 100 or heights_in_batch == 20: + if len(batches[-1]) == 100: batches.append([]) - heights_in_batch = 1 batches[-1].append(txid) if not batches[-1]: batches.pop() for batch in batches: - transactions.extend(await self._single_batch(batch, remote_heights, header_cache)) - return transactions + async for tx in self._single_batch(batch, remote_heights): + yield tx async def request_synced_transactions(self, to_request, remote_history, address): - pending_sync = await self.request_transactions(((txid, height) for txid, height in to_request.values())) - await asyncio.gather(*(self._sync(tx, remote_history) for tx in pending_sync)) - return pending_sync + async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): + await self._sync(tx, remote_history) + yield tx - async def _single_batch(self, batch, remote_heights, header_cache): - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) - transactions = [] + async def _save_sync_batch(self, synced_txs, address): + await self.db.save_transaction_io_batch( + synced_txs, address, self.address_to_hash160(address), "" + ) + while synced_txs: + self._on_transaction_controller.add(TransactionEvent(address, synced_txs.pop())) + + async def _single_batch(self, batch, remote_heights): + restricted = max(remote_heights.values()) > (self.network.remote_height - 10) + restricted = restricted or min(remote_heights.values()) < 0 + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, restricted) for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] - merkle_height = merkle['block_height'] tx = Transaction(unhexlify(raw), height=remote_height) - tx.height = remote_height - if 'merkle' in merkle and remote_heights[txid] > 0: - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - try: - header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) - except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) - else: - header_cache[remote_heights[txid]] = header - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - self._tx_cache[txid].tx = tx - transactions.append(tx) - return transactions + await self.maybe_verify_transaction(tx, remote_height, merkle) + yield tx async def _sync(self, tx, remote_history): - check_db_for_txos = [] + check_db_for_txos = {} for txi in tx.inputs: if txi.txo_ref.txo is not None: continue - if txi.txo_ref.tx_ref.id not in remote_history: - continue - if txi.txo_ref.tx_ref.id in self._tx_cache: + wanted_txid = txi.txo_ref.tx_ref.id + if wanted_txid not in remote_history: continue + if wanted_txid in self._tx_cache: + txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref else: - check_db_for_txos.append(txi.txo_ref.id) + check_db_for_txos[txi] = txi.txo_ref.id referenced_txos = {} if not check_db_for_txos else { txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True + txoid__in=check_db_for_txos.values(), order_by='txo.txoid', no_tx=True ) } - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - if txi.txo_ref.tx_ref.id not in remote_history: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref - continue - wanted_txid = txi.txo_ref.tx_ref.id - if wanted_txid in self._tx_cache: - await self._tx_cache[wanted_txid].has_tx.wait() - txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref + for txi in check_db_for_txos: + if txi.txo_ref.id in referenced_txos: + txi.txo_ref = referenced_txos[txi.txo_ref.id].ref + else: + log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) return tx async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: @@ -802,7 +780,8 @@ class Ledger(metaclass=LedgerRegistry): if len(outputs.txs) > 0: txs: List[Transaction] = [] if session_override: - txs.extend((await self.request_transactions(tuple(outputs.txs), session_override))) + async for tx in self.request_transactions(tuple(outputs.txs), session_override): + txs.append(tx) else: txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)))) From 26dab04c9e6e5bc82b8dccf4cc18d78e7876d5ae Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 20 Dec 2020 13:34:47 -0300 Subject: [PATCH 05/14] checkpoint --- lbry/wallet/checkpoints.py | 141 +++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/lbry/wallet/checkpoints.py b/lbry/wallet/checkpoints.py index 3b33bd5fb..623369317 100644 --- a/lbry/wallet/checkpoints.py +++ b/lbry/wallet/checkpoints.py @@ -740,4 +740,145 @@ HASHES = { 738000: 'aebdf15b23eb7a37600f67d45bf6586b1d5bff3d5f3459adc2f6211ab3dd0bcb', 739000: '3f5a894ac42f95f7d54ce25c42ea0baf1a05b2da0e9406978de0dc53484d8b04', 740000: '55debc22f995d844eafa0a90296c9f4f433e2b7f38456fff45dd3c66cef04e37', + 741000: '927b47fc909b4b55c067bbd75d8638af1400fac076cb642e9500a747d849e458', + 742000: '97fa3d83eb94114496e418c118f549ebfb8f6d123d0b40a12ecb093239557646', + 743000: '482b66d8d5084703079c28e3ae69e5dee735f762d6fcf9743e75f04e139fd181', + 744000: 'f406890d5c70808a58fb14429bad812a3185bdb9dace1aa57de76663f92b5013', + 745000: '2bd0802cbb8aa4441a159104d39515a4ff6fc8dfe616bc83e88197847c78bcff', + 746000: '24d090a7b6359db3d5d714a69ddc9a6f2e8ff8f044b723220a8ba32df785fd54', + 747000: '07c4ce9ce5310ee472cf753ddb03c39c5fee6c910d491daffd38615205411633', + 748000: 'ea913798c0f09d0a27eae7c852954c2c88b8c3b7f23f8fba26b68a3952d0ffde', + 749000: '23f256adebfe35d49ba84ad49f3f71fc67f7745091c91f22e65f1cc2e23b8f2c', + 750000: '96db12ee3a295f3d5c56d244e6e7493f58c08d3427e379940e5d4f891a41ec26', + 751000: 'cedaf12415dac1314942e58ced80830b92fbfabc41f42a0b0f054f0672ef9822', + 752000: '293606bcd9fbbee5584724301b2cf86bb69204820023e1fb46c238ddfbc660ab', + 753000: 'f4d43cbb38b7d97919dedc0f5a6dc8007896c4f443b76f3e5693e25bc46760cf', + 754000: 'fcaad22fd815311280fe451086516375d1d9d92b2990c7c351407df5aa19011e', + 755000: 'b9276f10d1844cb5b0308766c8db960490ac34a73c4653d0a91202789a6ccb9b', + 756000: '2fe5581f1110c1c8dcea46cad647551bd6bd640cb37738d863e189bd8f368347', + 757000: 'b9d915f366f0b010429a52245b0fb02774157eb9fd8f66bce32dcd3acc71c2a1', + 758000: '62d1854fc15db56b5d0e05ceeb54c1297966bf9dc7f7a0a14b42c059fc485d1b', + 759000: 'f4ca9f69d16d092f4a0ea5102e6343b21204c4ea9cd9b22cddd77dbb5d68ade3', + 760000: 'df3bb86641330d8cc7f55a2fd0da28251219e95babe960a308b18e08a7d88fc8', + 761000: 'a93029475de4bc7569b6ae802d658cd91c84cc253772712a279f140a6c3b91b1', + 762000: '307e289dc6ec8bcd62ca8831e4159d5edd780f2fae55ba55dd446225450f46f8', + 763000: '293f73514abca24f374473bd0394179812952a04ea13dc60ef5ada5331fa274f', + 764000: 'dd8b082db9281e3d9bacf15d6b352fda186d2d2923c7731844d0d4764dd71db8', + 765000: '201239e562d2571bf47347b3522fff89632aecea3b2d8cef05151f88b2b0bcdb', + 766000: '4a55a538b51b5650979e64521998cd5c5ad055ba9f3ac0e3e2a28febc6cc2798', + 767000: '3916666f2adbb05ea98ec1961f9546b9afa0f6910ec95e42ce37267f2ae4f79c', + 768000: 'dc0ad881eedcb5fd4954238f462080d6e7636b058d481698ed1c077e0ce2207e', + 769000: 'eaf10a1e1ec6e129289b8479a05df03e0808f1f0946f1995de6524e9ebe7a461', + 770000: '7200c64f22e32de7f999583361c933680fc9a2ffcb9a5ab73d3076fd49ec7537', + 771000: 'd883111a2eeacff80ce31df35ab6c943805b9e48877b413fccf371e5dbfa7fb2', + 772000: '3977d3c60edb9c80c97bb2b759b1659cbb650ad2d3a6f61d2caec83f1b2ae84c', + 773000: '9c7175fb8646a1a82383b4c534fd01bcf92d65c43d87ae854d51a784b04dc77e', + 774000: 'e0e92485f86e5fffa87b3497424e43b02a37710517d9d3f272392e8cdc56e5e9', + 775000: '6395229113d3aa2105afbaeb8b59621a536fc61fe272314b2fc3bdda98dd66cc', + 776000: 'b4b00207328b5f032bd4f0b634f91323ff520ada8c8bfec241b23c8e4bfd5a4e', + 777000: '14cdc6f5f7b4bd5bad745dfe6fcd114e9194026412a2e1b3f345be2eef433d16', + 778000: 'd3cd7b68be504c32117b670d38d59d44b02dcf3d65811efc2ca5531d902623cc', + 779000: 'afcd220e4040cb5f92d4b38fc204e59822df2218f767f2c4b33597b238a35f77', + 780000: '78252a9cfc289a70192ed8dd3dddeb1b9a4f9b8eff9a5d0ac259b3254472cf68', + 781000: '02ebc3f17d947481a311b4771c254f1e002b6a9198d4a5258ce6c13165aadddc', + 782000: '8dd9f1f372ee6d688a0bcdc3b342c77804ba5a646a218be4bc2aa02d846206c0', + 783000: 'e46b0d02ec2ef488fae455665e107520e1bd2b4f35ca52af7ad8addd2f72fa73', + 784000: '9ee8a8de94231e3ae3a610b82fdbca48dc14d9b80791d20af6c365a31822df6f', + 785000: '21e1cc12def8173a50158b2833bd91a62140c61646f5e08aecaee3e6da20735e', + 786000: 'b3e659f84d73de42888cc0f2b69bae71dd5fa6756a437a4b21958b182faa316e', + 787000: 'a9be7ba00ea6a9ea6bd03d8412ec014ca7e8cda6bdc33382f165e702811b8836', + 788000: 'a4c14729f8a68c03f5a0ccd890ac6a92b39c143f1f752fe81ad051eb52d8dce0', + 789000: '5cf66d224e5645097efc9c3c0392b51c8ca8ea1295151921a7912a2f04ee1274', + 790000: '676769ade71c33bc102bce416e66eb2c6794b03d7b8f5a590c87c380da463775', + 791000: '0228e074451797bf6bfbc941bcafcbadc972d32e4e1e0c5da015513f65714217', + 792000: '0fa3d00a1f19c5ac060e10a410cf7cea18eac5f89018d79ce51ac3fc66bbb365', + 793000: '5f68d0868b424e32f5ce3d8e7d9f18979da7b831b8ef4e3974d62fb20ff53a97', + 794000: '34508c56423739c00a837801b654b07decb274d02b383eff396d23c4d64bc0e9', + 795000: '7f70910c855d1fd88cd7f9be8a3b94314ee408a31a2da6301404bf8deb07c12c', + 796000: 'b74ab8813b1d2a0967fea0e66597572e5f0b5a285e21f5150fcc9d5f757de130', + 797000: 'bba27b1491d907ab1baa456cb651dc5b071231b1b6ad27b62d351ca12c25dbfd', + 798000: 'e75dcb15b2fc91f02e75e600dde9f6f46c09672533bc82a5d6916c4a2cd8613a', + 799000: 'adf62c826a3e0b33af439a7881918ae4ce19c5fb2ca37d21243415f7d716aa65', + 800000: 'd8f0ca13a8c8a19c254a3a6ba15150a34711dca96f2d877162cc44aa2acfb268', + 801000: '2a8c7104c4040a2bc31913ae25e9361df5bac9477368c708f86c1ca640480887', + 802000: '1f3b09d3561c4a8a056b263289bd492dc6c0d604c3fa195935e735d1c0ddc40e', + 803000: '037769628c40a701fdb4b16d79084b8fbb319fde79770a7ac842f3cdc813099e', + 804000: 'a0c6a089e5fa1e3589ca282085fe7201a5705776d81b257ffd252b2947fa6428', + 805000: 'b2ac99bfc4a488e7b7624b31ee061991a6dd0881bb005cd13f3dd2e66a08fe19', + 806000: 'ffe63cb999a278280b80a667d2dcb60c40e43a53f733914d8bec808b694ebf83', + 807000: 'eddb09fc6c4869a59b520d0befb1fb6ac952333f3cc5de086539c85ea8558778', + 808000: '0f4fb3f9172e52897ea992d9f3a2024126c4d2e63e9888739f11fb1f5e4c1f46', + 809000: '9641dd720d23ced2f1cb6e5cf46ac4e547afb9f56263c4cf58e3b19d407cf401', + 810000: 'de6dc953acd7e5ef213b3aaf1c4a9ee1d5b756bfce5525ee105214647e243a85', + 811000: 'c52c83712ca12b24b2db1b4a575e7f352b1d560cbf702e121a03bdca9e8be23d', + 812000: '83143734bb965318a53a38a7e403dcdb3e3fadedb01ab12c370417fc2a0655c0', + 813000: 'e480deff10c5a84fc957e3aed936690e24b74dd08fa8858a8a953c2f7383b914', + 814000: '810d33afcee07b9abe16c6cdc3a041038daa131c476b0daf48a080007f08b490', + 815000: 'b4aeb9e16fddd27844b2d56bc2b221134039bb5642c9e9ba88372afbdeac3972', + 816000: '86e73b67aae3d248011b8f66ed414cb8a9ba4b2a3cf7e32773cfbff055d719b7', + 817000: '3ebb8b83752b48242016cb682f0f6bd14e15371bf1163a5933193eaa0edeb351', + 818000: '4d925e17f642f220bbf317d3d5355d2f41fbce325f190f8c3b32dc0b337d24d6', + 819000: 'b9cc126d620f6b99d90a00d35957b0e428aaaa7c986bc9e816a60e4334572961', + 820000: '9c2f8c142bed1f94dca29276f7c83958be8cfe11773bb9b56c808fbcf7d3b1f8', + 821000: 'e5509eb98895cfa12a8da5d54c1df3f52472ffcbdf707adbf84a4a9c5d356203', + 822000: '764aada4802ebfe4ef935ab50af06a4f83aa556c49fdde3d9e12e1abd230c16b', + 823000: '1dbd745c2e96a365d865f990d109137d32d42977f503af55d8c00b109d31d3c3', + 824000: '954304a0b0c8f549c3bffd5ff46b5b8f05b0f0fde2a36f24fd5af9d774fb3079', + 825000: '17808b14f2056c1a5d46cb7617e9de9be6a1a6084edbc1bdb778586467a72297', + 826000: '3ca1167d4cac8b187829b23001b438617c43704b42462c4eb001b0d434cb9651', + 827000: '246d1607245e4a202f420393ac2e30e9cbf5eb5570dc997073b897f6d8643023', + 828000: '1764730a8dc3e89d02d168ff6bb54e8c903820b74711af6ff27bd0c8545577e7', + 829000: 'd9f3ab0cd823c6305bd8b95a96188bb4f2ca90b4d66c5d12293e8b6192bac0f2', + 830000: 'd4ff51f0092b04aedf8d39937680d8e8309b1be21d36e7833ed36f8e30aad6ea', + 831000: '3e92e76721b962396dce52993fa7606552f0907b38f7b2bd7b21ada98c145f47', + 832000: 'df12fcdb4cbe53ba627ace6de898298de175f8671d3d90170732d110fcdc34b8', + 833000: '25167ff38ae4a5964b618cabe0a12d4de62ac7a4c47448cdb4499e09e108d5b9', + 834000: 'd31f5309ea179a1e386e835fc372e47dcda6871a3a239abfba50c4f368994f13', + 835000: 'aff7e8dd3e55ea807fcbe284014075f420b3a23f1b0eb47bacdc1c91d2899813', + 836000: '3b5ac6d64c470739bb17d1544a285affb40f2d33e92687e5ba7c5ac602e0d72a', + 837000: 'd5619cbfe4f27c55f2bf9351b4891636cf64fef88212a5eeeae7bd3de47fe0bd', + 838000: '1f9102a49c6ac470cb5d0050e5300b1443840d6d65719b835e3bea484aafb2ec', + 839000: '3f63e391f0fbc5787fbe4ace3bada3816261294ea1c6ee435001801023682f90', + 840000: '777894fd12bd0d6dee7bcde2995c68e55e7094e3122da38571e4b6c4304b75e0', + 841000: 'ceb0c598c788e25e43e25aa4beff5c7377035824844cf1675eaea537074df028', + 842000: '8661cf2065dc713d2ba043f0b81f0effcc940eeb3e91906a21ff22c210561dcd', + 843000: '0dc2766f90415009d0c86bedffee6ebcf58042eb08262c0c67c4e9ed86b2aec8', + 844000: '26d072da864cab268a12794977b04ec44fb69ef3978e2342e82225974dac54dd', + 845000: '95e93bb60be8d5f07a1f4d26290c914957a82fc9d26ae8a3f20082eda27406ff', + 846000: 'f1bdc39af7705e58ab8b6c31dc70dce1e115db1cfd8cc9b037949dfbec82a59a', + 847000: 'f5f10f06396ecf2765d8a081141d489737c1d8d57c281f28f57c4cb2f90db883', + 848000: '331b8ef08605bae8d749893af9ed54f0df4f07a5a002108a2a0aea82d0360979', + 849000: '75b5f6233ab9a1bbc3c8b2893e5b22a0aa98e7ea635261255dc3c281f67d2260', + 850000: '5d7e6fe83e0ea1910a54a00090704737671d6f44df4228e21440ad1fc15e595f', + 851000: '7822db25d3ff0f6695ee38bad91edf317b5c6611673d28f1d22053110bb558be', + 852000: '2f0effad83a3561fc1a2806a562786a641d9ddb18d16bb9308006e7d324a21e9', + 853000: 'f603b2eaff11d5296377d990651317d40a1b2599ad2c5250eab131090f4b9458', + 854000: '34d59b26a50f18a9f250736d0f2e69d28b7e196fbef9b8a26c6b0b75c16aa194', + 855000: '76dd1ffff3946c0878969886fcf177ce5ab5560df19ddf006f9bcb02ae3e4e4f', + 856000: '74ff0b6f64e9dd5802fec2aac1d3ae194d28b9264114adaf0a882b46c8c918fe', + 857000: '7b5badfa2e4f40aa597a504d7ebe83c3705a2c6169a8c168ce293db223bc2d32', + 858000: '2bb0767a0f72b20d45ecfc3e34517dbda16d85758e040cf0e147f4cbd0cc57ac', + 859000: '3d741b9c365a91ed76f85824b94d19ec19b608d232660840ba59c7aa4b2cb67f', + 860000: 'd481a5a117878c0e3acd1f5844e150fb30e617577947d9846b1d214d703b71b0', + 861000: '54033424e488a3f1ad6946d4a6d9acb48465d6b1dbe8e1c2504a54cc84d7cad4', + 862000: '464bc3820a8cc8844dc9e26c388009e9982c656d46ef4b4fd0a2cb0e4eea0aaa', + 863000: 'd1aa94be2174f66780c4f226b9da3f6712b0f37af8dec33360bea83ca261b342', + 864000: '8c16008f11de5bc395d88cd802514ff647450f1bc136724b9aaf2ccce10a494f', + 865000: '3dae86012e97a201e2e1a47c899001ac00f78dc108026ed7c4194858c6c6dd5a', + 866000: 'afe5b0ccab995e1a1fa25fbc24c1d4b1a92c43042d03395f8743dcd806e72fd8', + 867000: 'c83716ac171aa9ab0d414833db340fa30e82bfda6cc616d3038529caab9b5600', + 868000: '8c409fe03cd35ef2d8e366818788b40eaeb4c8f6ae91450d75f4a66ca5f69cad', + 869000: '1d47909ceba790b8e1ce2e9902ee2775ea99e58efdb95668f9803a8ccf95f286', + 870000: '9adf5da1476388f053aa42de636da169d1cf1c9652cdf7cd9ad4fb18a0eb3388', + 871000: '8ad57fb1e74bcba0b5614fbac003be2bb32275dd85b38f2d28a0585005a99cfc', + 872000: '84a32e92012a356106e9657da8dab1a5491ea588fc29d411c69b20680c666420', + 873000: 'adf5921bbbfaa43929f67e6a070975313b77b456e262c700a27be611fceb17ae', + 874000: '09eaa7c4b18c79a46a2895190333f72336826223d5c986849a06f5153f49f2a5', + 875000: '235d7e4f31966507312149ea4c5e294aa84c695cf840117f0ef5963be7a0bda1', + 876000: '9aa9cb806ccbec0475ac330b496c5b2edeba38ba3f1e13ddd54a01457634a288', + 877000: 'c1e7f9b2b20bb1c4c0deadbc786d31fdf36f262325342aa23d1a66e2846b22bc', + 878000: 'ee0d2b20ac28ce23ab38698a57c6beff14f12b7af9d027c05cc92f652695f46b', + 879000: '0eb0810f4b81d1845b0a88f05449408df2e45715c9210a656f45278c5fdf7956', + 880000: 'e7d613027e3b4ca38d09bbef07998b57db237c6d67f1e8ea50024d2e0d9a1a72', + 881000: '21af4d355d8756b8bf0369b2d79b5c824148ae069026ba5c14f9dd6b7555e1db', } From 3b9e312615001bf2b22fc653392d93c1a39f6371 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 20 Dec 2020 13:35:43 -0300 Subject: [PATCH 06/14] fix verification --- lbry/wallet/ledger.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 111bee067..9d368b2d5 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -560,9 +560,7 @@ class Ledger(metaclass=LedgerRegistry): pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) if len(synced_txs) >= 100: - await self._save_sync_batch(synced_txs, address) log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request)) - await self._save_sync_batch(synced_txs, address) log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request)) assert len(pending_synced_history) == len(remote_history), \ @@ -574,7 +572,11 @@ class Ledger(metaclass=LedgerRegistry): if f"{txid}:{height}:" != pending_synced_history[i]: log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i]) synced_history += pending_synced_history[i] - await self.db.set_address_history(address, synced_history) + await self.db.save_transaction_io_batch( + synced_txs, address, self.address_to_hash160(address), synced_history + ) + while synced_txs: + self._on_transaction_controller.add(TransactionEvent(address, synced_txs.pop())) cache_size = self.config.get("tx_cache_size", 10_000) for txid, cache_item in updated_cached_items.items(): @@ -636,10 +638,12 @@ class Ledger(metaclass=LedgerRegistry): cached = TransactionCacheItem() self._tx_cache[tx.id] = cached cached.tx = tx - if 0 < remote_height < len(self.headers) and cached.pending_verifications <= 1: + if 0 < remote_height < len(self.headers): # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case if not merkle: merkle = await self.network.retriable_call(self.network.get_merkle, tx.id, remote_height) + if 'merkle' not in merkle: + return merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) header = await self.headers.get(remote_height) tx.position = merkle['pos'] @@ -662,21 +666,14 @@ class Ledger(metaclass=LedgerRegistry): yield tx async def request_synced_transactions(self, to_request, remote_history, address): + pending_sync = [] async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): - await self._sync(tx, remote_history) + pending_sync.append(asyncio.ensure_future(self._sync(tx, remote_history))) yield tx - - async def _save_sync_batch(self, synced_txs, address): - await self.db.save_transaction_io_batch( - synced_txs, address, self.address_to_hash160(address), "" - ) - while synced_txs: - self._on_transaction_controller.add(TransactionEvent(address, synced_txs.pop())) + await asyncio.gather(*pending_sync) async def _single_batch(self, batch, remote_heights): - restricted = max(remote_heights.values()) > (self.network.remote_height - 10) - restricted = restricted or min(remote_heights.values()) < 0 - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, restricted) + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] tx = Transaction(unhexlify(raw), height=remote_height) @@ -692,13 +689,14 @@ class Ledger(metaclass=LedgerRegistry): if wanted_txid not in remote_history: continue if wanted_txid in self._tx_cache: + await self._tx_cache[wanted_txid].has_tx.wait() txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref else: check_db_for_txos[txi] = txi.txo_ref.id referenced_txos = {} if not check_db_for_txos else { txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos.values(), order_by='txo.txoid', no_tx=True + txoid__in=list(check_db_for_txos.values()), order_by='txo.txoid', no_tx=True ) } From 21a2e677554bf854731d2cdd08b8ecbf15230176 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 20 Dec 2020 21:06:08 -0300 Subject: [PATCH 07/14] fix rebase --- lbry/wallet/ledger.py | 69 +++++++++++++++--------------------------- lbry/wallet/network.py | 4 +-- 2 files changed, 27 insertions(+), 46 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 9d368b2d5..8d0dfd251 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -770,18 +770,13 @@ class Ledger(metaclass=LedgerRegistry): include_is_my_output=False, include_sent_supports=False, include_sent_tips=False, - include_received_tips=False, - session_override=None) -> Tuple[List[Output], dict, int, int]: + include_received_tips=False) -> Tuple[List[Output], dict, int, int]: encoded_outputs = await query outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? - txs = [] + txs: List[Transaction] = [] if len(outputs.txs) > 0: - txs: List[Transaction] = [] - if session_override: - async for tx in self.request_transactions(tuple(outputs.txs), session_override): - txs.append(tx) - else: - txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)))) + async for tx in self.request_transactions(tuple(outputs.txs)): + txs.append(tx) _txos, blocked = outputs.inflate(txs) @@ -856,25 +851,17 @@ class Ledger(metaclass=LedgerRegistry): async def resolve(self, accounts, urls, new_sdk_server=None, **kwargs): txos = [] urls_copy = list(urls) - if new_sdk_server: resolve = partial(self.network.new_resolve, new_sdk_server) - while urls_copy: - batch, urls_copy = urls_copy[:500], urls_copy[500:] - txos.extend( - (await self._inflate_outputs( - resolve(batch), accounts, **kwargs - ))[0] - ) else: - async with self.network.single_call_context(self.network.resolve) as (resolve, session): - while urls_copy: - batch, urls_copy = urls_copy[:500], urls_copy[500:] - txos.extend( - (await self._inflate_outputs( - resolve(batch), accounts, session_override=session, **kwargs - ))[0] - ) + resolve = partial(self.network.retriable_call, self.network.resolve) + while urls_copy: + batch, urls_copy = urls_copy[:100], urls_copy[100:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, **kwargs + ))[0] + ) assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received." result = {} @@ -896,17 +883,13 @@ class Ledger(metaclass=LedgerRegistry): new_sdk_server=None, **kwargs) -> Tuple[List[Output], dict, int, int]: if new_sdk_server: claim_search = partial(self.network.new_claim_search, new_sdk_server) - return await self._inflate_outputs( - claim_search(**kwargs), accounts, - include_purchase_receipt=include_purchase_receipt, - include_is_my_output=include_is_my_output, - ) - async with self.network.single_call_context(self.network.claim_search) as (claim_search, session): - return await self._inflate_outputs( - claim_search(**kwargs), accounts, session_override=session, - include_purchase_receipt=include_purchase_receipt, - include_is_my_output=include_is_my_output, - ) + else: + claim_search = self.network.claim_search + return await self._inflate_outputs( + claim_search(**kwargs), accounts, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: for claim in (await self.claim_search(accounts, claim_id=claim_id, **kwargs))[0]: @@ -1011,7 +994,7 @@ class Ledger(metaclass=LedgerRegistry): return self.db.get_channel_count(**constraints) async def resolve_collection(self, collection, offset=0, page_size=1): - claim_ids = collection.claim.collection.claims.ids[offset:page_size+offset] + claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset] try: resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids) except Exception as err: @@ -1060,7 +1043,7 @@ class Ledger(metaclass=LedgerRegistry): 'txid': tx.id, 'timestamp': ts, 'date': datetime.fromtimestamp(ts).isoformat(' ')[:-3] if tx.height > 0 else None, - 'confirmations': (headers.height+1) - tx.height if tx.height > 0 else 0, + 'confirmations': (headers.height + 1) - tx.height if tx.height > 0 else 0, 'claim_info': [], 'update_info': [], 'support_info': [], @@ -1070,7 +1053,7 @@ class Ledger(metaclass=LedgerRegistry): is_my_inputs = all([txi.is_my_input for txi in tx.inputs]) if is_my_inputs: # fees only matter if we are the ones paying them - item['value'] = dewies_to_lbc(tx.net_account_balance+tx.fee) + item['value'] = dewies_to_lbc(tx.net_account_balance + tx.fee) item['fee'] = dewies_to_lbc(-tx.fee) else: # someone else paid the fees @@ -1093,13 +1076,13 @@ class Ledger(metaclass=LedgerRegistry): if txi.txo_ref.txo is not None: other_txo = txi.txo_ref.txo if (other_txo.is_claim or other_txo.script.is_support_claim) \ - and other_txo.claim_id == txo.claim_id: + and other_txo.claim_id == txo.claim_id: previous = other_txo break if previous is not None: item['update_info'].append({ 'address': txo.get_address(self), - 'balance_delta': dewies_to_lbc(previous.amount-txo.amount), + 'balance_delta': dewies_to_lbc(previous.amount - txo.amount), 'amount': dewies_to_lbc(txo.amount), 'claim_id': txo.claim_id, 'claim_name': txo.claim_name, @@ -1177,7 +1160,7 @@ class Ledger(metaclass=LedgerRegistry): for account in accounts: balance = self._balance_cache.get(account.id) if not balance: - balance = self._balance_cache[account.id] =\ + balance = self._balance_cache[account.id] = \ await account.get_detailed_balance(confirmations, reserved_subtotals=True) for key, value in balance.items(): if key == 'reserved_subtotals': @@ -1187,7 +1170,6 @@ class Ledger(metaclass=LedgerRegistry): result[key] += value return result - class TestNetLedger(Ledger): network_name = 'testnet' pubkey_address_prefix = bytes((111,)) @@ -1196,7 +1178,6 @@ class TestNetLedger(Ledger): extended_private_key_prefix = unhexlify('04358394') checkpoints = {} - class RegTestLedger(Ledger): network_name = 'regtest' headers_class = UnvalidatedHeaders diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 939a7c880..19e7166c1 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -260,9 +260,9 @@ class Network: restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 return self.rpc('blockchain.transaction.get', [tx_hash], restricted) - def get_transaction_batch(self, txids, restricted=True, session=None): + def get_transaction_batch(self, txids, restricted=True): # use any server if its old, otherwise restrict to who gave us the history - return self.rpc('blockchain.transaction.get_batch', txids, restricted, session) + return self.rpc('blockchain.transaction.get_batch', txids, restricted) def get_transaction_and_merkle(self, tx_hash, known_height=None): # use any server if its old, otherwise restrict to who gave us the history From eb2a4aebba82895fc1ef6012d7ea7c2fb706bca4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Dec 2020 16:49:08 -0300 Subject: [PATCH 08/14] unrestricted and reusing verified cache hits --- lbry/wallet/ledger.py | 50 ++++++++++---------------------- tests/unit/wallet/test_ledger.py | 2 +- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 8d0dfd251..d2e09e546 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -155,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_ready_controller = StreamController() self.on_ready = self._on_ready_controller.stream - self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 10_000)) + self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 1024)) self._update_tasks = TaskGroup() self._other_tasks = TaskGroup() # that we dont need to start self._utxo_reservation_lock = asyncio.Lock() @@ -516,7 +516,6 @@ class Ledger(metaclass=LedgerRegistry): synced_txs = [] to_request = {} pending_synced_history = {} - updated_cached_items = {} already_synced = set() already_synced_offset = 0 @@ -526,10 +525,6 @@ class Ledger(metaclass=LedgerRegistry): already_synced.add((txid, remote_height)) already_synced_offset += 1 continue - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = TransactionCacheItem() - self._tx_cache[txid] = cache_item tx_indexes = {} @@ -537,18 +532,6 @@ class Ledger(metaclass=LedgerRegistry): tx_indexes[txid] = i if (txid, remote_height) in already_synced: continue - cache_item = self._tx_cache.get(txid) - cache_item.pending_verifications += 1 - updated_cached_items[txid] = cache_item - - assert cache_item is not None, 'cache item is none' - # tx = cache_item.tx - # if cache_item.tx is not None and \ - # cache_item.tx.height >= remote_height and \ - # (cache_item.tx.is_verified or remote_height < 1): - # synced_txs.append(cache_item.tx) # cached tx is already up-to-date - # pending_synced_history[i] = f'{tx.id}:{tx.height}:' - # continue to_request[i] = (txid, remote_height) log.debug( @@ -561,6 +544,12 @@ class Ledger(metaclass=LedgerRegistry): synced_txs.append(tx) if len(synced_txs) >= 100: log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request)) + await self.db.save_transaction_io_batch( + synced_txs, address, self.address_to_hash160(address), "" + ) + while synced_txs: + tx = synced_txs.pop() + self._on_transaction_controller.add(TransactionEvent(address, tx)) log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request)) assert len(pending_synced_history) == len(remote_history), \ @@ -578,27 +567,12 @@ class Ledger(metaclass=LedgerRegistry): while synced_txs: self._on_transaction_controller.add(TransactionEvent(address, synced_txs.pop())) - cache_size = self.config.get("tx_cache_size", 10_000) - for txid, cache_item in updated_cached_items.items(): - cache_item.pending_verifications -= 1 - if cache_item.pending_verifications < 0: - log.warning("config value tx cache size %i needs to be increased", cache_size) - cache_item.pending_verifications = 0 - - if address_manager is None: address_manager = await self.get_address_manager_for_address(address) if address_manager is not None: await address_manager.ensure_address_gap() - for txid, cache_item in updated_cached_items.items(): - if self._tx_cache.get(txid) is not cache_item: - log.warning("tx cache corrupted while syncing %s, reattempt sync=%s", address, reattempt_update) - if reattempt_update: - return await self.update_history(address, remote_status, address_manager, False) - return False - local_status, local_history = \ await self.get_local_status_and_history(address, synced_history) @@ -654,6 +628,12 @@ class Ledger(metaclass=LedgerRegistry): remote_heights = {} for txid, height in sorted(to_request, key=lambda x: x[1]): + if txid in self._tx_cache: + if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + yield self._tx_cache[txid].tx + continue + else: + self._tx_cache[txid] = TransactionCacheItem() remote_heights[txid] = height if len(batches[-1]) == 100: batches.append([]) @@ -673,7 +653,9 @@ class Ledger(metaclass=LedgerRegistry): await asyncio.gather(*pending_sync) async def _single_batch(self, batch, remote_heights): - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + heights = {remote_heights[txid] for txid in batch} + unrestriced = 0 < min(heights) < max(heights) < max(self.headers.checkpoints or [0]) + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, not unrestriced) for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] tx = Transaction(unhexlify(raw), height=remote_height) diff --git a/tests/unit/wallet/test_ledger.py b/tests/unit/wallet/test_ledger.py index e1822e2b6..c8385c837 100644 --- a/tests/unit/wallet/test_ledger.py +++ b/tests/unit/wallet/test_ledger.py @@ -40,7 +40,7 @@ class MockNetwork: merkle = await self.get_merkle(tx_hash, known_height) return tx, merkle - async def get_transaction_batch(self, txids): + async def get_transaction_batch(self, txids, restricted): return { txid: await self.get_transaction_and_merkle(txid) for txid in txids From 6a610187e04fb5b0b348cf80f054f4ded3ef2374 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Dec 2020 22:37:48 -0300 Subject: [PATCH 09/14] cache bypass --- lbry/wallet/ledger.py | 45 +++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index d2e09e546..fc9bb5b82 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -606,12 +606,6 @@ class Ledger(metaclass=LedgerRegistry): async def maybe_verify_transaction(self, tx, remote_height, merkle=None): tx.height = remote_height - cached = self._tx_cache.get(tx.id) - if not cached: - # cache txs looked up by transaction_show too - cached = TransactionCacheItem() - self._tx_cache[tx.id] = cached - cached.tx = tx if 0 < remote_height < len(self.headers): # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case if not merkle: @@ -623,17 +617,18 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...]): + async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], cached=False): batches = [[]] remote_heights = {} for txid, height in sorted(to_request, key=lambda x: x[1]): - if txid in self._tx_cache: - if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: - yield self._tx_cache[txid].tx - continue - else: - self._tx_cache[txid] = TransactionCacheItem() + if cached: + if txid in self._tx_cache: + if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + yield self._tx_cache[txid].tx + continue + else: + self._tx_cache[txid] = TransactionCacheItem() remote_heights[txid] = height if len(batches[-1]) == 100: batches.append([]) @@ -643,14 +638,16 @@ class Ledger(metaclass=LedgerRegistry): for batch in batches: async for tx in self._single_batch(batch, remote_heights): + if cached: + self._tx_cache[tx.id].tx = tx yield tx async def request_synced_transactions(self, to_request, remote_history, address): - pending_sync = [] + pending_sync = {} async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): - pending_sync.append(asyncio.ensure_future(self._sync(tx, remote_history))) + pending_sync[tx.id] = tx yield tx - await asyncio.gather(*pending_sync) + await asyncio.gather(*(self._sync(tx, remote_history, pending_sync) for tx in pending_sync.values())) async def _single_batch(self, batch, remote_heights): heights = {remote_heights[txid] for txid in batch} @@ -662,7 +659,7 @@ class Ledger(metaclass=LedgerRegistry): await self.maybe_verify_transaction(tx, remote_height, merkle) yield tx - async def _sync(self, tx, remote_history): + async def _sync(self, tx, remote_history, pending_txs): check_db_for_txos = {} for txi in tx.inputs: if txi.txo_ref.txo is not None: @@ -670,9 +667,8 @@ class Ledger(metaclass=LedgerRegistry): wanted_txid = txi.txo_ref.tx_ref.id if wanted_txid not in remote_history: continue - if wanted_txid in self._tx_cache: - await self._tx_cache[wanted_txid].has_tx.wait() - txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref + if wanted_txid in pending_txs: + tx = pending_txs[wanted_txid] else: check_db_for_txos[txi] = txi.txo_ref.id @@ -686,8 +682,11 @@ class Ledger(metaclass=LedgerRegistry): if txi.txo_ref.id in referenced_txos: txi.txo_ref = referenced_txos[txi.txo_ref.id].ref else: - log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) - return tx + tx = await self.db.get_transaction(txid=wanted_txid) + if tx is None: + log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) + else: + txi.txo_ref = tx.outputs[txi.txo_ref.position].ref async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: details = await self.db.get_address(address=address) @@ -757,7 +756,7 @@ class Ledger(metaclass=LedgerRegistry): outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? txs: List[Transaction] = [] if len(outputs.txs) > 0: - async for tx in self.request_transactions(tuple(outputs.txs)): + async for tx in self.request_transactions(tuple(outputs.txs), cached=True): txs.append(tx) _txos, blocked = outputs.inflate(txs) From 4cbf4230e89c71e28d8a724a31c2ce34ee4034b3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Dec 2020 14:05:37 -0500 Subject: [PATCH 10/14] fix txi.txo_ref --- lbry/wallet/ledger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index fc9bb5b82..696442ab8 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -668,7 +668,7 @@ class Ledger(metaclass=LedgerRegistry): if wanted_txid not in remote_history: continue if wanted_txid in pending_txs: - tx = pending_txs[wanted_txid] + txi.txo_ref = pending_txs[wanted_txid].outputs[txi.txo_ref.position].ref else: check_db_for_txos[txi] = txi.txo_ref.id From 479b5d31a9f4ab69b2904062b929e6dd7dad69e0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Dec 2020 14:06:53 -0500 Subject: [PATCH 11/14] fix test --- tests/unit/wallet/test_ledger.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/unit/wallet/test_ledger.py b/tests/unit/wallet/test_ledger.py index c8385c837..276c5fa15 100644 --- a/tests/unit/wallet/test_ledger.py +++ b/tests/unit/wallet/test_ledger.py @@ -126,9 +126,7 @@ class TestSynchronization(LedgerTestCase): self.ledger.network.get_history_called = [] self.ledger.network.get_transaction_called = [] - self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified) - self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified) - self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified) + self.assertEqual(0, len(self.ledger._tx_cache)) await self.ledger.update_history(address, '') self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_transaction_called, []) From 8c71b744f3717d6b3d1086f20d21ae6beb618fc6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Dec 2020 15:17:32 -0500 Subject: [PATCH 12/14] fix request_synced_transactions edge cases --- lbry/wallet/ledger.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 696442ab8..5f646d826 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -415,6 +415,7 @@ class Ledger(metaclass=LedgerRegistry): "Blockchain Reorganization: attempting rewind to height %s from starting height %s", height, height+rewound ) + self._tx_cache.clear() else: raise IndexError(f"headers.connect() returned negative number ({added})") @@ -646,8 +647,8 @@ class Ledger(metaclass=LedgerRegistry): pending_sync = {} async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): pending_sync[tx.id] = tx - yield tx - await asyncio.gather(*(self._sync(tx, remote_history, pending_sync) for tx in pending_sync.values())) + for f in asyncio.as_completed([self._sync(tx, remote_history, pending_sync) for tx in pending_sync.values()]): + yield await f async def _single_batch(self, batch, remote_heights): heights = {remote_heights[txid] for txid in batch} @@ -682,11 +683,12 @@ class Ledger(metaclass=LedgerRegistry): if txi.txo_ref.id in referenced_txos: txi.txo_ref = referenced_txos[txi.txo_ref.id].ref else: - tx = await self.db.get_transaction(txid=wanted_txid) - if tx is None: + tx_from_db = await self.db.get_transaction(txid=txi.txo_ref.tx_ref.id) + if tx_from_db is None: log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) else: - txi.txo_ref = tx.outputs[txi.txo_ref.position].ref + txi.txo_ref = tx_from_db.outputs[txi.txo_ref.position].ref + return tx async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: details = await self.db.get_address(address=address) From ddbae294e6635f285dbd5e66a8e24c62e099a64f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Dec 2020 16:55:33 -0500 Subject: [PATCH 13/14] skip doc test on gitlab --- .gitlab-ci.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 46aa3a845..eb69df328 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -57,11 +57,11 @@ test:other-integration: - pip install tox-travis - tox -e other -test:json-api: - stage: test - script: - - make install tools - - HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py +#test:json-api: +# stage: test +# script: +# - make install tools +# - HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py From 7a7446c8bded9b42a3ce0535bd1582f31a90adc0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Dec 2020 20:08:26 -0500 Subject: [PATCH 14/14] force resync blockchain.db --- lbry/wallet/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index aef2c6811..fb6c33648 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -430,7 +430,7 @@ class SQLiteMixin: return await self.db.executescript('\n'.join( f"DROP TABLE {table};" for table in tables - )) + ) + '\n' + 'PRAGMA WAL_CHECKPOINT(FULL);' + '\n' + 'VACUUM;') await self.db.execute(self.CREATE_VERSION_TABLE) await self.db.execute("INSERT INTO version VALUES (?)", (self.SCHEMA_VERSION,)) await self.db.executescript(self.CREATE_TABLES_QUERY) @@ -574,7 +574,7 @@ def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: L class Database(SQLiteMixin): - SCHEMA_VERSION = "1.4" + SCHEMA_VERSION = "1.5" PRAGMAS = """ pragma journal_mode=WAL;