From fa63bf758d8732fde6ec7ae8014830a6d94663a2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 20 Nov 2020 15:52:11 -0500 Subject: [PATCH 1/2] delete single_call_context, use session pool --- lbry/wallet/ledger.py | 87 +++++++---------------------------- lbry/wallet/network.py | 25 ---------- lbry/wallet/server/session.py | 5 +- 3 files changed, 18 insertions(+), 99 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 1676b01f1..c09b31eea 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -637,40 +637,6 @@ class Ledger(metaclass=LedgerRegistry): log.debug("finished syncing transaction history for %s, %i known txs", address, len(local_history)) return True - async def cache_transaction(self, txid, remote_height, check_local=True): - cache_item = self._tx_cache.get(txid) - if cache_item is None: - cache_item = self._tx_cache[txid] = TransactionCacheItem() - elif cache_item.tx is not None and \ - cache_item.tx.height >= remote_height and \ - (cache_item.tx.is_verified or remote_height < 1): - return cache_item.tx # cached tx is already up-to-date - - cache_item.pending_verifications += 1 - try: - async with cache_item.lock: - tx = cache_item.tx - if tx is None and check_local: - # check local db - tx = cache_item.tx = await self.db.get_transaction(txid=txid) - merkle = None - if tx is None: - # fetch from network - _raw, merkle = await self.network.retriable_call( - self.network.get_transaction_and_merkle, txid, remote_height - ) - tx = Transaction(unhexlify(_raw), height=merkle['block_height']) - cache_item.tx = tx # make sure it's saved before caching it - tx.height = remote_height - if merkle and 0 < remote_height < len(self.headers): - merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) - header = await self.headers.get(remote_height) - tx.position = merkle['pos'] - tx.is_verified = merkle_root == header['merkle_root'] - return tx - finally: - cache_item.pending_verifications -= 1 - async def maybe_verify_transaction(self, tx, remote_height, merkle=None): tx.height = remote_height cached = self._tx_cache.get(tx.id) @@ -688,7 +654,7 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] - async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): + async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]): header_cache = {} batches = [[]] remote_heights = {} @@ -709,12 +675,9 @@ class Ledger(metaclass=LedgerRegistry): batches.pop() async def _single_batch(batch): - if session_override: - batch_result = await self.network.get_transaction_batch( - batch, restricted=False, session=session_override - ) - else: - batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + batch_result = await self.network.retriable_call( + self.network.get_transaction_batch, batch, restricted=False + ) for txid, (raw, merkle) in batch_result.items(): remote_height = remote_heights[txid] merkle_height = merkle['block_height'] @@ -898,17 +861,13 @@ class Ledger(metaclass=LedgerRegistry): include_is_my_output=False, include_sent_supports=False, include_sent_tips=False, - include_received_tips=False, - session_override=None) -> Tuple[List[Output], dict, int, int]: + include_received_tips=False) -> Tuple[List[Output], dict, int, int]: encoded_outputs = await query outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? txs = [] if len(outputs.txs) > 0: txs: List[Transaction] = [] - if session_override: - txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs), session_override))) - else: - txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)))) + txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs)))) _txos, blocked = outputs.inflate(txs) @@ -983,25 +942,17 @@ class Ledger(metaclass=LedgerRegistry): async def resolve(self, accounts, urls, new_sdk_server=None, **kwargs): txos = [] urls_copy = list(urls) - if new_sdk_server: resolve = partial(self.network.new_resolve, new_sdk_server) - while urls_copy: - batch, urls_copy = urls_copy[:500], urls_copy[500:] - txos.extend( - (await self._inflate_outputs( - resolve(batch), accounts, **kwargs - ))[0] - ) else: - async with self.network.single_call_context(self.network.resolve) as (resolve, session): - while urls_copy: - batch, urls_copy = urls_copy[:500], urls_copy[500:] - txos.extend( - (await self._inflate_outputs( - resolve(batch), accounts, session_override=session, **kwargs - ))[0] - ) + resolve = partial(self.network.retriable_call, self.network.resolve) + while urls_copy: + batch, urls_copy = urls_copy[:100], urls_copy[100:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, **kwargs + ))[0] + ) assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received." result = {} @@ -1023,17 +974,13 @@ class Ledger(metaclass=LedgerRegistry): new_sdk_server=None, **kwargs) -> Tuple[List[Output], dict, int, int]: if new_sdk_server: claim_search = partial(self.network.new_claim_search, new_sdk_server) - return await self._inflate_outputs( + else: + claim_search = self.network.claim_search + return await self._inflate_outputs( claim_search(**kwargs), accounts, include_purchase_receipt=include_purchase_receipt, include_is_my_output=include_is_my_output, ) - async with self.network.single_call_context(self.network.claim_search) as (claim_search, session): - return await self._inflate_outputs( - claim_search(**kwargs), accounts, session_override=session, - include_purchase_receipt=include_purchase_receipt, - include_is_my_output=include_is_my_output, - ) async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: for claim in (await self.claim_search(accounts, claim_id=claim_id, **kwargs))[0]: diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index d4b460600..939a7c880 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -3,10 +3,7 @@ import asyncio import json from time import perf_counter from operator import itemgetter -from contextlib import asynccontextmanager -from functools import partial from typing import Dict, Optional, Tuple - import aiohttp from lbry import __version__ @@ -255,28 +252,6 @@ class Network: pass raise asyncio.CancelledError() # if we got here, we are shutting down - @asynccontextmanager - async def single_call_context(self, function, *args, **kwargs): - if not self.is_connected: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") - await self.on_connected.first - await self.session_pool.wait_for_fastest_session() - server = self.session_pool.fastest_session.server - session = ClientSession(network=self, server=server) - - async def call_with_reconnect(*a, **kw): - while self.running: - if not session.available: - await session.create_connection() - try: - return await partial(function, *args, session_override=session, **kwargs)(*a, **kw) - except asyncio.TimeoutError: - log.warning("'%s' failed, retrying", function.__name__) - try: - yield (call_with_reconnect, session) - finally: - await session.close() - def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index c5f79b116..79fbda84f 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1227,10 +1227,7 @@ class LBRYElectrumX(SessionBase): return await self.address_status(hashX) async def hashX_unsubscribe(self, hashX, alias): - try: - del self.hashX_subs[hashX] - except ValueError: - pass + self.hashX_subs.pop(hashX, None) def address_to_hashX(self, address): try: From 34f4f12eb9930b7f04d655185abd0b85621ee509 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 20 Nov 2020 16:15:57 -0500 Subject: [PATCH 2/2] pylint --- lbry/wallet/ledger.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index c09b31eea..35c890e6b 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -977,10 +977,10 @@ class Ledger(metaclass=LedgerRegistry): else: claim_search = self.network.claim_search return await self._inflate_outputs( - claim_search(**kwargs), accounts, - include_purchase_receipt=include_purchase_receipt, - include_is_my_output=include_is_my_output, - ) + claim_search(**kwargs), accounts, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: for claim in (await self.claim_search(accounts, claim_id=claim_id, **kwargs))[0]: