From 03a643da5263b40eea7160b87710398e01fb2d01 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 Jul 2020 11:55:23 -0400 Subject: [PATCH 01/10] use block cache --- lbry/wallet/server/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 748060da2..8f3024665 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1529,7 +1529,7 @@ class LBRYElectrumX(SessionBase): block_hash = tx_info.get('blockhash') if not block_hash: return raw_tx, {'block_height': -1} - merkle_height = (await self.daemon_request('deserialised_block', block_hash))['height'] + merkle_height = (await self.daemon.deserialised_block(block_hash))['height'] merkle = await self.transaction_merkle(tx_hash, merkle_height) return raw_tx, merkle @@ -1592,7 +1592,7 @@ class LBRYElectrumX(SessionBase): height = non_negative_integer(height) hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) block_hash = hex_hashes[0] - block = await self.daemon_request('deserialised_block', block_hash) + block = await self.daemon.deserialised_block(block_hash) return block_hash, block['tx'] def _get_merkle_branch(self, tx_hashes, tx_pos): From 7a1b7db7c82754c5eaa989e8c74c32bee292944d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 Jul 2020 11:58:23 -0400 Subject: [PATCH 02/10] support multiple blocks with `blockchain.transaction.get_batch` --- lbry/wallet/network.py | 4 ++++ lbry/wallet/server/mempool.py | 14 ++++++++++++ lbry/wallet/server/session.py | 40 +++++++++++++---------------------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 92793e9ca..262f46579 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -255,6 +255,10 @@ class Network: restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 return self.rpc('blockchain.transaction.get', [tx_hash], restricted) + def get_transaction_batch(self, txids): + # use any server if its old, otherwise restrict to who gave us the history + return self.rpc('blockchain.transaction.get_batch', txids, True) + def get_transaction_and_merkle(self, tx_hash, known_height=None): # use any server if its old, otherwise restrict to who gave us the history restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 7ca887892..e253dc7c5 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -389,3 +389,17 @@ class MemPool: if hX == hashX: utxos.append(UTXO(-1, pos, tx_hash, 0, value)) return utxos + + def get_mempool_height(self, tx_hash): + # Height Progression + # -2: not broadcast + # -1: in mempool but has unconfirmed inputs + # 0: in mempool and all inputs confirmed + # +num: confirmed in a specific block (height) + if tx_hash not in self.txs: + return -2 + tx = self.txs[tx_hash] + unspent_inputs = sum(1 if hash in self.txs else 0 for hash, idx in tx.prevouts) + if unspent_inputs: + return -1 + return 0 diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 8f3024665..9e897e29a 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1539,35 +1539,25 @@ class LBRYElectrumX(SessionBase): for tx_hash in tx_hashes: assert_tx_hash(tx_hash) batch_result = {} - height = None - block_hash = None - block = None for tx_hash in tx_hashes: tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) raw_tx = tx_info['hex'] - if height is None: - if 'blockhash' in tx_info: - block_hash = tx_info['blockhash'] - block = await self.daemon_request('deserialised_block', block_hash) - height = block['height'] - else: - height = -1 - if block_hash != tx_info.get('blockhash'): - raise RPCError(BAD_REQUEST, f'request contains a mix of transaction heights') + block_hash = tx_info.get('blockhash') + merkle = {} + if block_hash: + block = await self.daemon.deserialised_block(block_hash) + height = block['height'] + try: + pos = block['tx'].index(tx_hash) + except ValueError: + raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' + f'block {block_hash} at height {height:,d}') + merkle["merkle"] = self._get_merkle_branch(block['tx'], pos) + merkle["pos"] = pos else: - if not block_hash: - merkle = {'block_height': -1} - else: - try: - pos = block['tx'].index(tx_hash) - except ValueError: - raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' - f'block {block_hash} at height {height:,d}') - merkle = { - "merkle": self._get_merkle_branch(block['tx'], pos), - "pos": pos - } - batch_result[tx_hash] = [raw_tx, merkle] + height = -1 + merkle['block_height'] = height + batch_result[tx_hash] = [raw_tx, merkle] return batch_result async def transaction_get(self, tx_hash, verbose=False): From 5a39681a2e882f4d580898ae9500d3b6dc0d163a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 7 Jul 2020 14:21:16 -0400 Subject: [PATCH 03/10] log --- lbry/wallet/ledger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index fbd90aa53..54c26f521 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -697,7 +697,7 @@ class Ledger(metaclass=LedgerRegistry): local_height, height ) return False - log.debug( + log.warning( "local history does not contain %s, requested height %i", tx.id, height ) return False From 420c9f10c297978bdc09dc5f925964a08a9269c2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 7 Jul 2020 14:50:02 -0400 Subject: [PATCH 04/10] remove _update_cache_item --- lbry/wallet/ledger.py | 44 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 54c26f521..512987f23 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -601,33 +601,31 @@ class Ledger(metaclass=LedgerRegistry): (cache_item.tx.is_verified or remote_height < 1): return cache_item.tx # cached tx is already up-to-date + cache_item.pending_verifications += 1 try: - cache_item.pending_verifications += 1 - return await self._update_cache_item(cache_item, txid, remote_height, check_local) + async with cache_item.lock: + tx = cache_item.tx + if tx is None and check_local: + # check local db + tx = cache_item.tx = await self.db.get_transaction(txid=txid) + merkle = None + if tx is None: + # fetch from network + _raw, merkle = await self.network.retriable_call( + self.network.get_transaction_and_merkle, txid, remote_height + ) + tx = Transaction(unhexlify(_raw), height=merkle['block_height']) + cache_item.tx = tx # make sure it's saved before caching it + tx.height = remote_height + if merkle and 0 < remote_height < len(self.headers): + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + header = await self.headers.get(remote_height) + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + return tx finally: cache_item.pending_verifications -= 1 - async def _update_cache_item(self, cache_item, txid, remote_height, check_local=True): - - async with cache_item.lock: - - tx = cache_item.tx - - if tx is None and check_local: - # check local db - tx = cache_item.tx = await self.db.get_transaction(txid=txid) - - merkle = None - if tx is None: - # fetch from network - _raw, merkle = await self.network.retriable_call( - self.network.get_transaction_and_merkle, txid, remote_height - ) - tx = Transaction(unhexlify(_raw), height=merkle.get('block_height')) - cache_item.tx = tx # make sure it's saved before caching it - await self.maybe_verify_transaction(tx, remote_height, merkle) - return tx - async def maybe_verify_transaction(self, tx, remote_height, merkle=None): tx.height = remote_height cached = self._tx_cache.get(tx.id) From fce80374f422e571515a6db63e336873749fa110 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 Jul 2020 12:15:26 -0400 Subject: [PATCH 05/10] batched sync --- lbry/wallet/ledger.py | 192 ++++++++++++++++++++++++++++++++---------- 1 file changed, 149 insertions(+), 43 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 512987f23..70b22b103 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -164,6 +164,7 @@ class Ledger(metaclass=LedgerRegistry): self._utxo_reservation_lock = asyncio.Lock() self._header_processing_lock = asyncio.Lock() self._address_update_locks: DefaultDict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + self._history_lock = asyncio.Lock() self.coin_selection_strategy = None self._known_addresses_out_of_sync = set() @@ -489,10 +490,9 @@ class Ledger(metaclass=LedgerRegistry): address, remote_status = update self._update_tasks.add(self.update_history(address, remote_status)) - async def update_history(self, address, remote_status, address_manager: AddressManager = None): + async def update_history(self, address, remote_status, address_manager: AddressManager = None, reattempt_update: bool = True): async with self._address_update_locks[address]: self._known_addresses_out_of_sync.discard(address) - local_status, local_history = await self.get_local_status_and_history(address) if local_status == remote_status: @@ -504,54 +504,79 @@ class Ledger(metaclass=LedgerRegistry): if not we_need: return True - cache_tasks: List[asyncio.Task[Transaction]] = [] - synced_history = StringIO() - loop = asyncio.get_running_loop() - for i, (txid, remote_height) in enumerate(remote_history): - if i < len(local_history) and local_history[i] == (txid, remote_height) and not cache_tasks: - synced_history.write(f'{txid}:{remote_height}:') - else: - check_local = (txid, remote_height) not in we_need - cache_tasks.append(loop.create_task( - self.cache_transaction(txid, remote_height, check_local=check_local) - )) - + acquire_lock_tasks = [] synced_txs = [] - for task in cache_tasks: - tx = await task + to_request = {} + pending_synced_history = {} + updated_cached_items = {} + already_synced = set() - check_db_for_txos = [] - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) - if cache_item is not None: - if cache_item.tx is None: - await cache_item.has_tx.wait() - assert cache_item.tx is not None - txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref - else: - check_db_for_txos.append(txi.txo_ref.id) + for i, (txid, remote_height) in enumerate(remote_history): + if not acquire_lock_tasks and i < len(local_history) and local_history[i] == (txid, remote_height): + pending_synced_history[i] = f'{txid}:{remote_height}:' + already_synced.add((txid, remote_height)) + continue + cache_item = self._tx_cache.get(txid) + if cache_item is None: + cache_item = TransactionCacheItem() + self._tx_cache[txid] = cache_item + acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) - referenced_txos = {} if not check_db_for_txos else { - txo.id: txo for txo in await self.db.get_txos( - txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True - ) - } + await asyncio.wait(acquire_lock_tasks) - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - continue - referenced_txo = referenced_txos.get(txi.txo_ref.id) - if referenced_txo is not None: - txi.txo_ref = referenced_txo.ref + tx_indexes = {} - synced_history.write(f'{tx.id}:{tx.height}:') + for i, (txid, remote_height) in enumerate(remote_history): + tx_indexes[txid] = i + if (txid, remote_height) in already_synced: + continue + cache_item = self._tx_cache.get(txid) + cache_item.pending_verifications += 1 + updated_cached_items[txid] = cache_item + + assert cache_item is not None, 'cache item is none' + assert cache_item.lock.locked(), 'cache lock is not held?' + # tx = cache_item.tx + # if cache_item.tx is not None and \ + # cache_item.tx.height >= remote_height and \ + # (cache_item.tx.is_verified or remote_height < 1): + # synced_txs.append(cache_item.tx) # cached tx is already up-to-date + # pending_synced_history[i] = f'{tx.id}:{tx.height}:' + # continue + to_request[i] = (txid, remote_height) + + log.info("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address) + requested_txes = await self._request_transaction_batch(to_request) + for tx in requested_txes: + pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) + log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address) + + assert len(pending_synced_history) == len(remote_history), f"{len(pending_synced_history)} vs {len(remote_history)}\n{remote_history}\n{pending_synced_history}" + synced_history = "" + for remote_i, i in zip(range(len(remote_history)), sorted(pending_synced_history.keys())): + assert i == remote_i, f"{i} vs {remote_i}" + txid, height = remote_history[remote_i] + if f"{txid}:{height}:" != pending_synced_history[i]: + log.warning("history mismatch: %s:%i: vs %s", remote_history[remote_i], pending_synced_history[i]) + synced_history += pending_synced_history[i] + + cache_size = self.config.get("tx_cache_size", 100_000) + for txid, cache_item in updated_cached_items.items(): + cache_item.pending_verifications -= 1 + if cache_item.pending_verifications < 0: + log.warning("config value tx cache size %i needs to be increased", cache_size) + cache_item.pending_verifications = 0 + try: + cache_item.lock.release() + except RuntimeError: + log.warning("lock was already released?") + pass await self.db.save_transaction_io_batch( - synced_txs, address, self.address_to_hash160(address), synced_history.getvalue() + synced_txs, address, self.address_to_hash160(address), synced_history ) + await asyncio.wait([ self._on_transaction_controller.add(TransactionEvent(address, tx)) for tx in synced_txs @@ -563,8 +588,16 @@ class Ledger(metaclass=LedgerRegistry): if address_manager is not None: await address_manager.ensure_address_gap() + for txid, cache_item in updated_cached_items.items(): + if self._tx_cache.get(txid) is not cache_item: + log.warning("tx cache corrupted while syncing %s, reattempt sync=%s", address, reattempt_update) + if reattempt_update: + return await self.update_history(address, remote_status, address_manager, False) + return False + local_status, local_history = \ - await self.get_local_status_and_history(address, synced_history.getvalue()) + await self.get_local_status_and_history(address, synced_history) + if local_status != remote_status: if local_history == remote_history: log.warning( @@ -632,8 +665,8 @@ class Ledger(metaclass=LedgerRegistry): if not cached: # cache txs looked up by transaction_show too cached = TransactionCacheItem() - cached.tx = tx self._tx_cache[tx.id] = cached + cached.tx = tx if 0 < remote_height < len(self.headers) and cached.pending_verifications <= 1: # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case if not merkle: @@ -643,6 +676,79 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] + async def _request_transaction_batch(self, to_request): + header_cache = {} + batches = [[]] + remote_heights = {} + synced_txs = [] + + for idx in sorted(to_request): + txid = to_request[idx][0] + height = to_request[idx][1] + remote_heights[txid] = height + if len(batches[-1]) == 1: + batches.append([]) + batches[-1].append(txid) + if not len(batches[-1]): + batches.pop() + + async def _single_batch(batch): + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + for txid, (raw, merkle) in batch_result.items(): + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + cache_item = self._tx_cache.get(txid) + if cache_item is None: + cache_item = TransactionCacheItem() + self._tx_cache[txid] = cache_item + tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx.height = remote_height + cache_item.tx = tx + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) + else: + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + check_db_for_txos = [] + + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) + if cache_item is not None: + if cache_item.tx is None: + await cache_item.has_tx.wait() + assert cache_item.tx is not None + txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref + else: + check_db_for_txos.append(txi.txo_ref.id) + + referenced_txos = {} if not check_db_for_txos else { + txo.id: txo for txo in await self.db.get_txos( + txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True + ) + } + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + continue + referenced_txo = referenced_txos.get(txi.txo_ref.id) + if referenced_txo is not None: + txi.txo_ref = referenced_txo.ref + synced_txs.append(tx) + + if batches: + await asyncio.wait([ + asyncio.create_task( + _single_batch(_batch) + ) for _batch in batches + ]) + return synced_txs + async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: details = await self.db.get_address(address=address) for account in self.accounts: From fc1a06bc45cb885cd335005efc3868df52bb9d00 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 Jul 2020 17:56:20 -0400 Subject: [PATCH 06/10] fix --- lbry/wallet/ledger.py | 53 ++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 70b22b103..ffa0c9d86 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_transaction_controller = StreamController() self.on_transaction = self._on_transaction_controller.stream self.on_transaction.listen( - lambda e: log.info( + lambda e: log.debug( '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s', self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id ) @@ -520,9 +520,13 @@ class Ledger(metaclass=LedgerRegistry): if cache_item is None: cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item + if len(acquire_lock_tasks) > 10000: + await asyncio.wait(acquire_lock_tasks) + acquire_lock_tasks.clear() acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) - await asyncio.wait(acquire_lock_tasks) + if acquire_lock_tasks: + await asyncio.wait(acquire_lock_tasks) tx_indexes = {} @@ -546,11 +550,10 @@ class Ledger(metaclass=LedgerRegistry): to_request[i] = (txid, remote_height) log.info("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address) - requested_txes = await self._request_transaction_batch(to_request) + requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) - log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address) assert len(pending_synced_history) == len(remote_history), f"{len(pending_synced_history)} vs {len(remote_history)}\n{remote_history}\n{pending_synced_history}" synced_history = "" @@ -573,14 +576,11 @@ class Ledger(metaclass=LedgerRegistry): log.warning("lock was already released?") pass + log.info("updating address count and status") await self.db.save_transaction_io_batch( - synced_txs, address, self.address_to_hash160(address), synced_history + [], address, self.address_to_hash160(address), synced_history ) - - await asyncio.wait([ - self._on_transaction_controller.add(TransactionEvent(address, tx)) - for tx in synced_txs - ]) + log.info("updated address count and status") if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -623,6 +623,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync.add(address) return False else: + log.info("synced %s", address) return True async def cache_transaction(self, txid, remote_height, check_local=True): @@ -676,23 +677,29 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def _request_transaction_batch(self, to_request): + async def _request_transaction_batch(self, to_request, remote_history_size, address): header_cache = {} batches = [[]] remote_heights = {} synced_txs = [] - + heights_in_batch = 1 for idx in sorted(to_request): txid = to_request[idx][0] height = to_request[idx][1] remote_heights[txid] = height - if len(batches[-1]) == 1: + if idx > 1 and height != remote_heights[batches[-1][-1]]: + heights_in_batch += 1 + if len(batches[-1]) == 100 or heights_in_batch == 10: batches.append([]) + heights_in_batch = 1 batches[-1].append(txid) if not len(batches[-1]): batches.pop() + last_showed_synced_count = 0 + async def _single_batch(batch): + this_batch_synced = [] batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] @@ -740,13 +747,23 @@ class Ledger(metaclass=LedgerRegistry): if referenced_txo is not None: txi.txo_ref = referenced_txo.ref synced_txs.append(tx) - - if batches: + this_batch_synced.append(tx) + await self.db.save_transaction_io_batch( + this_batch_synced, address, self.address_to_hash160(address), "" + ) await asyncio.wait([ - asyncio.create_task( - _single_batch(_batch) - ) for _batch in batches + self._on_transaction_controller.add(TransactionEvent(address, tx)) + for tx in this_batch_synced ]) + nonlocal last_showed_synced_count + if last_showed_synced_count + 100 < len(synced_txs): + log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) + last_showed_synced_count = len(synced_txs) + + await asyncio.wait( + [_single_batch(batch) for batch in batches] + ) + log.info("finished syncing history for %s", address) return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: From 9d5370be5f544df4ccbe9d7789e5be829028166b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Jul 2020 13:34:08 -0400 Subject: [PATCH 07/10] fix --- lbry/wallet/ledger.py | 49 +++++++++++++++++++------------- tests/unit/wallet/test_ledger.py | 13 +++++++-- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index ffa0c9d86..a303427bd 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -502,6 +502,12 @@ class Ledger(metaclass=LedgerRegistry): remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) we_need = set(remote_history) - set(local_history) if not we_need: + remote_missing = set(local_history) - set(remote_history) + if remote_missing: + log.warning( + "%i transactions we have for %s are not in the remote address history", + len(remote_missing), address + ) return True acquire_lock_tasks = [] @@ -511,18 +517,20 @@ class Ledger(metaclass=LedgerRegistry): updated_cached_items = {} already_synced = set() + already_synced_offset = 0 for i, (txid, remote_height) in enumerate(remote_history): - if not acquire_lock_tasks and i < len(local_history) and local_history[i] == (txid, remote_height): + if i == already_synced_offset and i < len(local_history) and local_history[i] == (txid, remote_height): pending_synced_history[i] = f'{txid}:{remote_height}:' already_synced.add((txid, remote_height)) + already_synced_offset += 1 continue cache_item = self._tx_cache.get(txid) if cache_item is None: cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - if len(acquire_lock_tasks) > 10000: - await asyncio.wait(acquire_lock_tasks) - acquire_lock_tasks.clear() + + for txid, remote_height in remote_history[already_synced_offset:]: + cache_item = self._tx_cache[txid] acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) if acquire_lock_tasks: @@ -576,11 +584,9 @@ class Ledger(metaclass=LedgerRegistry): log.warning("lock was already released?") pass - log.info("updating address count and status") await self.db.save_transaction_io_batch( [], address, self.address_to_hash160(address), synced_history ) - log.info("updated address count and status") if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -623,7 +629,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync.add(address) return False else: - log.info("synced %s", address) + log.info("finished syncing transaction history for %s, %i known txs", address, len(local_history)) return True async def cache_transaction(self, txid, remote_height, check_local=True): @@ -682,14 +688,16 @@ class Ledger(metaclass=LedgerRegistry): batches = [[]] remote_heights = {} synced_txs = [] - heights_in_batch = 1 + heights_in_batch = 0 + last_height = 0 for idx in sorted(to_request): txid = to_request[idx][0] height = to_request[idx][1] remote_heights[txid] = height - if idx > 1 and height != remote_heights[batches[-1][-1]]: + if height != last_height: heights_in_batch += 1 - if len(batches[-1]) == 100 or heights_in_batch == 10: + last_height = height + if len(batches[-1]) == 100 or heights_in_batch == 20: batches.append([]) heights_in_batch = 1 batches[-1].append(txid) @@ -728,10 +736,8 @@ class Ledger(metaclass=LedgerRegistry): continue cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) if cache_item is not None: - if cache_item.tx is None: - await cache_item.has_tx.wait() - assert cache_item.tx is not None - txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref + if cache_item.tx is not None: + txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref else: check_db_for_txos.append(txi.txo_ref.id) @@ -740,12 +746,20 @@ class Ledger(metaclass=LedgerRegistry): txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True ) } + for txi in tx.inputs: if txi.txo_ref.txo is not None: continue referenced_txo = referenced_txos.get(txi.txo_ref.id) if referenced_txo is not None: txi.txo_ref = referenced_txo.ref + continue + cache_item = self._tx_cache.get(txi.txo_ref.id) + if cache_item is None: + cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem() + if cache_item.tx is not None: + txi.txo_ref = cache_item.tx.ref + synced_txs.append(tx) this_batch_synced.append(tx) await self.db.save_transaction_io_batch( @@ -759,11 +773,8 @@ class Ledger(metaclass=LedgerRegistry): if last_showed_synced_count + 100 < len(synced_txs): log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) last_showed_synced_count = len(synced_txs) - - await asyncio.wait( - [_single_batch(batch) for batch in batches] - ) - log.info("finished syncing history for %s", address) + for batch in batches: + await _single_batch(batch) return synced_txs async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: diff --git a/tests/unit/wallet/test_ledger.py b/tests/unit/wallet/test_ledger.py index bfe5cc71b..e1822e2b6 100644 --- a/tests/unit/wallet/test_ledger.py +++ b/tests/unit/wallet/test_ledger.py @@ -35,11 +35,17 @@ class MockNetwork: async def get_transaction_and_merkle(self, tx_hash, known_height=None): tx = await self.get_transaction(tx_hash) - merkle = {} + merkle = {'block_height': -1} if known_height: merkle = await self.get_merkle(tx_hash, known_height) return tx, merkle + async def get_transaction_batch(self, txids): + return { + txid: await self.get_transaction_and_merkle(txid) + for txid in txids + } + class LedgerTestCase(AsyncioTestCase): @@ -120,8 +126,9 @@ class TestSynchronization(LedgerTestCase): self.ledger.network.get_history_called = [] self.ledger.network.get_transaction_called = [] - for cache_item in self.ledger._tx_cache.values(): - cache_item.tx.is_verified = True + self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified) + self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified) + self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified) await self.ledger.update_history(address, '') self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_transaction_called, []) From 76e60d9bc3b5fec037674c34968b3c274a6aae64 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 14 Jul 2020 20:31:40 -0400 Subject: [PATCH 08/10] logging --- lbry/wallet/ledger.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index a303427bd..8b24565f6 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_transaction_controller = StreamController() self.on_transaction = self._on_transaction_controller.stream self.on_transaction.listen( - lambda e: log.debug( + lambda e: log.info( '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s', self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id ) @@ -557,7 +557,7 @@ class Ledger(metaclass=LedgerRegistry): # continue to_request[i] = (txid, remote_height) - log.info("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address) + log.debug("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address) requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" @@ -629,7 +629,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync.add(address) return False else: - log.info("finished syncing transaction history for %s, %i known txs", address, len(local_history)) + log.debug("finished syncing transaction history for %s, %i known txs", address, len(local_history)) return True async def cache_transaction(self, txid, remote_height, check_local=True): From 2d8703bb8de54eaa3f4f80761173ba20f66dd645 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 14 Jul 2020 21:08:46 -0400 Subject: [PATCH 09/10] pylint --- lbry/wallet/ledger.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 8b24565f6..f41ffb64c 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -3,7 +3,6 @@ import copy import time import asyncio import logging -from io import StringIO from datetime import datetime from functools import partial from operator import itemgetter @@ -490,7 +489,8 @@ class Ledger(metaclass=LedgerRegistry): address, remote_status = update self._update_tasks.add(self.update_history(address, remote_status)) - async def update_history(self, address, remote_status, address_manager: AddressManager = None, reattempt_update: bool = True): + async def update_history(self, address, remote_status, address_manager: AddressManager = None, + reattempt_update: bool = True): async with self._address_update_locks[address]: self._known_addresses_out_of_sync.discard(address) local_status, local_history = await self.get_local_status_and_history(address) @@ -557,19 +557,23 @@ class Ledger(metaclass=LedgerRegistry): # continue to_request[i] = (txid, remote_height) - log.debug("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address) + log.debug( + "request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), + len(remote_history), address + ) requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" synced_txs.append(tx) - assert len(pending_synced_history) == len(remote_history), f"{len(pending_synced_history)} vs {len(remote_history)}\n{remote_history}\n{pending_synced_history}" + assert len(pending_synced_history) == len(remote_history), \ + f"{len(pending_synced_history)} vs {len(remote_history)}" synced_history = "" for remote_i, i in zip(range(len(remote_history)), sorted(pending_synced_history.keys())): assert i == remote_i, f"{i} vs {remote_i}" txid, height = remote_history[remote_i] if f"{txid}:{height}:" != pending_synced_history[i]: - log.warning("history mismatch: %s:%i: vs %s", remote_history[remote_i], pending_synced_history[i]) + log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i]) synced_history += pending_synced_history[i] cache_size = self.config.get("tx_cache_size", 100_000) @@ -582,7 +586,6 @@ class Ledger(metaclass=LedgerRegistry): cache_item.lock.release() except RuntimeError: log.warning("lock was already released?") - pass await self.db.save_transaction_io_batch( [], address, self.address_to_hash160(address), synced_history @@ -724,7 +727,7 @@ class Ledger(metaclass=LedgerRegistry): try: header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) except IndexError: - log.warning("failed to verify %s at height %i", tx.id, merkle_height) + log.warning("failed to verify %s at height %i", tx.id, merkle_height) else: header_cache[remote_heights[txid]] = header tx.position = merkle['pos'] From 2f81e9d3741dce6448ddbf4b24169fe1b8be3aca Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Tue, 14 Jul 2020 23:13:10 -0400 Subject: [PATCH 10/10] pylint --- lbry/wallet/ledger.py | 2 +- setup.cfg | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index f41ffb64c..50d68b2ac 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -704,7 +704,7 @@ class Ledger(metaclass=LedgerRegistry): batches.append([]) heights_in_batch = 1 batches[-1].append(txid) - if not len(batches[-1]): + if not batches[-1]: batches.pop() last_showed_synced_count = 0 diff --git a/setup.cfg b/setup.cfg index e8bc1920b..7a5b684e5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,6 +35,7 @@ disable= too-many-statements, too-many-nested-blocks, too-many-public-methods, + too-many-return-statements, too-many-instance-attributes, protected-access, unused-argument