From b8c16d8ac5bad777b4596cc50ae47c870fa4fe09 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 8 Dec 2020 12:45:45 -0500 Subject: [PATCH 1/2] fix mempool txs being re-requested with each address notification --- lbry/wallet/ledger.py | 23 +++++++++++------------ lbry/wallet/server/session.py | 2 +- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index b9487aeba..d30d018e7 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -513,7 +513,6 @@ class Ledger(metaclass=LedgerRegistry): return True acquire_lock_tasks = [] - synced_txs = [] to_request = {} pending_synced_history = {} updated_cached_items = {} @@ -531,9 +530,16 @@ class Ledger(metaclass=LedgerRegistry): cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item + unsynced_offset = already_synced_offset for txid, remote_height in remote_history[already_synced_offset:]: cache_item = self._tx_cache[txid] - acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) + if cache_item.tx is not None and cache_item.tx.height >= remote_height \ + and (not cache_item.tx.is_verified or remote_height < 1): + pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:' + already_synced.add((txid, cache_item.tx.height)) + else: + acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) + unsynced_offset += 1 if acquire_lock_tasks: await asyncio.wait(acquire_lock_tasks) @@ -550,23 +556,16 @@ class Ledger(metaclass=LedgerRegistry): assert cache_item is not None, 'cache item is none' assert cache_item.lock.locked(), 'cache lock is not held?' - # tx = cache_item.tx - # if cache_item.tx is not None and \ - # cache_item.tx.height >= remote_height and \ - # (cache_item.tx.is_verified or remote_height < 1): - # synced_txs.append(cache_item.tx) # cached tx is already up-to-date - # pending_synced_history[i] = f'{tx.id}:{tx.height}:' - # continue + to_request[i] = (txid, remote_height) log.debug( - "request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), - len(remote_history), address + "request %i transactions, %i/%i for %s are already synced", len(to_request), + len(pending_synced_history), len(remote_history), address ) requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) for tx in requested_txes: pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" - synced_txs.append(tx) assert len(pending_synced_history) == len(remote_history), \ f"{len(pending_synced_history)} vs {len(remote_history)}" diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index fd7477c6b..0a996d68e 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -651,7 +651,7 @@ class SessionManager: ] if header_tasks: self.logger.info(f'notify {len(header_tasks)} sessions of new header') - await asyncio.wait(header_tasks) + asyncio.create_task(asyncio.wait(header_tasks)) for hashX in touched.intersection(self.mempool_statuses.keys()): self.mempool_statuses.pop(hashX, None) From 8da73ad3dd7756ad4b5ccdfefb1a3e2a38ac5564 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 8 Dec 2020 14:42:28 -0500 Subject: [PATCH 2/2] improve hash_to_hex_str performance --- lbry/wallet/server/hash.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/server/hash.py b/lbry/wallet/server/hash.py index a1a5d8068..2c0201952 100644 --- a/lbry/wallet/server/hash.py +++ b/lbry/wallet/server/hash.py @@ -67,17 +67,17 @@ def hash160(x): return ripemd160(sha256(x)) -def hash_to_hex_str(x): +def hash_to_hex_str(x: bytes) -> str: """Convert a big-endian binary hash to displayed hex string. Display form of a binary hash is reversed and converted to hex. """ - return bytes(reversed(x)).hex() + return x[::-1].hex() -def hex_str_to_hash(x): +def hex_str_to_hash(x: str) -> bytes: """Convert a displayed hex string to a binary hash.""" - return bytes(reversed(hex_to_bytes(x))) + return hex_to_bytes(x)[::-1] class Base58Error(Exception):