From 2faa29b1c4f94a20d99b12daa027b7739ef44bcd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 9 Oct 2020 13:34:45 -0400 Subject: [PATCH 1/7] fix dht_monitor script --- scripts/dht_monitor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py index c6040f8d9..8b2a3f425 100644 --- a/scripts/dht_monitor.py +++ b/scripts/dht_monitor.py @@ -1,6 +1,7 @@ import curses import time import asyncio +import lbry.wallet from lbry.conf import Config from lbry.extras.daemon.client import daemon_rpc From 76946c447ffc0abca76a233c353fb410abdc32e8 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 18 Oct 2020 21:02:19 -0400 Subject: [PATCH 2/7] use single_call_context for claim_search and resolve --- lbry/wallet/ledger.py | 112 +++++++++++++++++++++++++++++++++-------- lbry/wallet/network.py | 40 ++++++++++++--- 2 files changed, 124 insertions(+), 28 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index ed0d954a4..e2fe338f8 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -29,7 +29,6 @@ from .constants import TXO_TYPES, CLAIM_TYPES, COIN, NULL_HASH32 from .bip32 import PubKey, PrivateKey from .coinselection import CoinSelector - log = logging.getLogger(__name__) LedgerType = Type['BaseLedger'] @@ -687,6 +686,59 @@ class Ledger(metaclass=LedgerRegistry): tx.position = merkle['pos'] tx.is_verified = merkle_root == header['merkle_root'] + async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...], session_override=None): + header_cache = {} + batches = [[]] + remote_heights = {} + transactions = [] + heights_in_batch = 0 + last_height = 0 + + for txid, height in sorted(to_request, key=lambda x: x[1]): + remote_heights[txid] = height + if height != last_height: + heights_in_batch += 1 + last_height = height + if len(batches[-1]) == 100 or heights_in_batch == 20: + batches.append([]) + heights_in_batch = 1 + batches[-1].append(txid) + if not batches[-1]: + batches.pop() + + async def _single_batch(batch): + if session_override: + batch_result = await self.network.get_transaction_batch( + batch, restricted=False, session=session_override + ) + else: + batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch) + for txid, (raw, merkle) in batch_result.items(): + remote_height = remote_heights[txid] + merkle_height = merkle['block_height'] + cache_item = self._tx_cache.get(txid) + if cache_item is None: + cache_item = TransactionCacheItem() + self._tx_cache[txid] = cache_item + tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx.height = remote_height + cache_item.tx = tx + if 'merkle' in merkle and remote_heights[txid] > 0: + merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) + try: + header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) + except IndexError: + log.warning("failed to verify %s at height %i", tx.id, merkle_height) + else: + header_cache[remote_heights[txid]] = header + tx.position = merkle['pos'] + tx.is_verified = merkle_root == header['merkle_root'] + transactions.append(tx) + + for batch in batches: + await _single_batch(batch) + return transactions + async def _request_transaction_batch(self, to_request, remote_history_size, address): header_cache = {} batches = [[]] @@ -844,14 +896,17 @@ class Ledger(metaclass=LedgerRegistry): include_is_my_output=False, include_sent_supports=False, include_sent_tips=False, - include_received_tips=False) -> Tuple[List[Output], dict, int, int]: + include_received_tips=False, + session_override=None) -> Tuple[List[Output], dict, int, int]: encoded_outputs = await query outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? txs = [] if len(outputs.txs) > 0: - txs: List[Transaction] = await asyncio.gather(*( - self.cache_transaction(*tx) for tx in outputs.txs - )) + txs: List[Transaction] = [] + if session_override: + txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs), session_override))) + else: + txs.extend((await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)))) _txos, blocked = outputs.inflate(txs) @@ -924,15 +979,28 @@ class Ledger(metaclass=LedgerRegistry): return txos, blocked, outputs.offset, outputs.total async def resolve(self, accounts, urls, new_sdk_server=None, **kwargs): - if new_sdk_server: - resolve = partial(self.network.new_resolve, new_sdk_server) - else: - resolve = partial(self.network.retriable_call, self.network.resolve) - urls_copy = list(urls) txos = [] - while urls_copy: - batch, urls_copy = urls_copy[:500], urls_copy[500:] - txos.extend((await self._inflate_outputs(resolve(batch), accounts, **kwargs))[0]) + urls_copy = list(urls) + + if new_sdk_server: + resolve = partial(self.network.resolve, new_sdk_server) + while urls_copy: + batch, urls_copy = urls_copy[:500], urls_copy[500:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, **kwargs + ))[0] + ) + else: + async with self.network.single_call_context(self.network.resolve) as (resolve, session): + while urls_copy: + batch, urls_copy = urls_copy[:500], urls_copy[500:] + txos.extend( + (await self._inflate_outputs( + resolve(batch), accounts, session_override=session, **kwargs + ))[0] + ) + assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received." result = {} for url, txo in zip(urls, txos): @@ -953,13 +1021,17 @@ class Ledger(metaclass=LedgerRegistry): new_sdk_server=None, **kwargs) -> Tuple[List[Output], dict, int, int]: if new_sdk_server: claim_search = partial(self.network.new_claim_search, new_sdk_server) - else: - claim_search = self.network.claim_search - return await self._inflate_outputs( - claim_search(**kwargs), accounts, - include_purchase_receipt=include_purchase_receipt, - include_is_my_output=include_is_my_output - ) + return await self._inflate_outputs( + claim_search(**kwargs), accounts, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) + async with self.network.single_call_context(self.network.claim_search) as (claim_search, session): + return await self._inflate_outputs( + claim_search(**kwargs), accounts, session_override=session, + include_purchase_receipt=include_purchase_receipt, + include_is_my_output=include_is_my_output, + ) async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: for claim in (await self.claim_search(accounts, claim_id=claim_id, **kwargs))[0]: diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index e3e396e83..d4b460600 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -3,6 +3,8 @@ import asyncio import json from time import perf_counter from operator import itemgetter +from contextlib import asynccontextmanager +from functools import partial from typing import Dict, Optional, Tuple import aiohttp @@ -230,8 +232,8 @@ class Network: def is_connected(self): return self.client and not self.client.is_closing() - def rpc(self, list_or_method, args, restricted=True): - session = self.client if restricted else self.session_pool.fastest_session + def rpc(self, list_or_method, args, restricted=True, session=None): + session = session or (self.client if restricted else self.session_pool.fastest_session) if session and not session.is_closing(): return session.send_request(list_or_method, args) else: @@ -253,6 +255,28 @@ class Network: pass raise asyncio.CancelledError() # if we got here, we are shutting down + @asynccontextmanager + async def single_call_context(self, function, *args, **kwargs): + if not self.is_connected: + log.warning("Wallet server unavailable, waiting for it to come back and retry.") + await self.on_connected.first + await self.session_pool.wait_for_fastest_session() + server = self.session_pool.fastest_session.server + session = ClientSession(network=self, server=server) + + async def call_with_reconnect(*a, **kw): + while self.running: + if not session.available: + await session.create_connection() + try: + return await partial(function, *args, session_override=session, **kwargs)(*a, **kw) + except asyncio.TimeoutError: + log.warning("'%s' failed, retrying", function.__name__) + try: + yield (call_with_reconnect, session) + finally: + await session.close() + def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] @@ -261,9 +285,9 @@ class Network: restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 return self.rpc('blockchain.transaction.get', [tx_hash], restricted) - def get_transaction_batch(self, txids): + def get_transaction_batch(self, txids, restricted=True, session=None): # use any server if its old, otherwise restrict to who gave us the history - return self.rpc('blockchain.transaction.get_batch', txids, True) + return self.rpc('blockchain.transaction.get_batch', txids, restricted, session) def get_transaction_and_merkle(self, tx_hash, known_height=None): # use any server if its old, otherwise restrict to who gave us the history @@ -316,11 +340,11 @@ class Network: def get_claims_by_ids(self, claim_ids): return self.rpc('blockchain.claimtrie.getclaimsbyids', claim_ids) - def resolve(self, urls): - return self.rpc('blockchain.claimtrie.resolve', urls) + def resolve(self, urls, session_override=None): + return self.rpc('blockchain.claimtrie.resolve', urls, False, session_override) - def claim_search(self, **kwargs): - return self.rpc('blockchain.claimtrie.search', kwargs) + def claim_search(self, session_override=None, **kwargs): + return self.rpc('blockchain.claimtrie.search', kwargs, False, session_override) async def new_resolve(self, server, urls): message = {"method": "resolve", "params": {"urls": urls, "protobuf": True}} From 925a458abe09fcec03511c03507d97553db46e88 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Oct 2020 10:55:55 -0400 Subject: [PATCH 3/7] tags --- lbry/wallet/server/db/common.py | 114 ++++++++++++++++++++++++++++++-- 1 file changed, 107 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/server/db/common.py b/lbry/wallet/server/db/common.py index d1b88d49d..99655753b 100644 --- a/lbry/wallet/server/db/common.py +++ b/lbry/wallet/server/db/common.py @@ -51,8 +51,8 @@ MOST_USED_TAGS = { "tutorial", "video game", "weapons", - "pc", "playthrough", + "pc", "anime", "how to", "btc", @@ -80,9 +80,9 @@ MOST_USED_TAGS = { "espaƱol", "money", "music video", + "nintendo", "movie", "coronavirus", - "nintendo", "donald trump", "steam", "trailer", @@ -90,10 +90,10 @@ MOST_USED_TAGS = { "podcast", "xbox one", "survival", + "audio", "linux", "travel", "funny moments", - "audio", "litecoin", "animation", "gamer", @@ -101,20 +101,120 @@ MOST_USED_TAGS = { "playstation", "bitcoin news", "history", - "fox news", "xxx", - "god", + "fox news", "dance", + "god", "adventure", "liberal", + "2020", "horror", "government", "freedom", - "2020", "reaction", "meme", "photography", - "truth" + "truth", + "health", + "lbry", + "family", + "online", + "eth", + "crypto news", + "diy", + "trading", + "gold", + "memes", + "world", + "space", + "lol", + "covid-19", + "rpg", + "humor", + "democrat", + "film", + "call of duty", + "tech", + "religion", + "conspiracy", + "rap", + "cnn", + "hangoutsonair", + "unboxing", + "fiction", + "conservative", + "cars", + "hoa", + "epic", + "programming", + "progressive", + "cryptocurrency news", + "classical", + "jesus", + "movies", + "book", + "ps3", + "republican", + "fitness", + "books", + "multiplayer", + "animals", + "pokemon", + "bitcoin price", + "facebook", + "sharefactory", + "criptomonedas", + "cod", + "bible", + "business", + "stream", + "comics", + "how", + "fail", + "nsfw", + "new music", + "satire", + "pets & animals", + "computer", + "classical music", + "indie", + "musica", + "msnbc", + "fps", + "mod", + "sport", + "sony", + "ripple", + "auto", + "rock", + "marvel", + "complete", + "mining", + "political", + "mobile", + "pubg", + "hip hop", + "flat earth", + "xbox 360", + "reviews", + "vlogging", + "latest news", + "hack", + "tarot", + "iphone", + "media", + "cute", + "christian", + "free speech", + "trap", + "war", + "remix", + "ios", + "xrp", + "spirituality", + "song", + "league of legends", + "cat" } MATURE_TAGS = [ From f21ab49ac5a0c16bebaa30382014bc92758e1164 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 3 Nov 2020 10:39:38 -0500 Subject: [PATCH 4/7] bump aioupnp requirement --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6dcedbbb5..42f185829 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ setup( }, install_requires=[ 'aiohttp==3.5.4', - 'aioupnp==0.0.17', + 'aioupnp==0.0.18', 'appdirs==1.4.3', 'certifi>=2018.11.29', 'colorama==0.3.7', From 5517d2bf56509dde8701ecea8af2f21aba215de0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 3 Nov 2020 16:23:31 -0500 Subject: [PATCH 5/7] fix new_sdk_server arg for resolve --- lbry/wallet/ledger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index e2fe338f8..8d1203011 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -983,7 +983,7 @@ class Ledger(metaclass=LedgerRegistry): urls_copy = list(urls) if new_sdk_server: - resolve = partial(self.network.resolve, new_sdk_server) + resolve = partial(self.network.new_resolve, new_sdk_server) while urls_copy: batch, urls_copy = urls_copy[:500], urls_copy[500:] txos.extend( From d83936a66a4f0ace21a32d038c543aa5835a352e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 4 Nov 2020 15:33:19 -0500 Subject: [PATCH 6/7] fix uncaught error --- lbry/wallet/ledger.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 8d1203011..77666e509 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -478,12 +478,14 @@ class Ledger(metaclass=LedgerRegistry): for address, remote_status in zip(batch, results): self._update_tasks.add(self.update_history(address, remote_status, address_manager)) addresses_remaining = addresses_remaining[batch_size:] - log.info("subscribed to %i/%i addresses on %s:%i", len(addresses) - len(addresses_remaining), - len(addresses), *self.network.client.server_address_and_port) - log.info( - "finished subscribing to %i addresses on %s:%i", len(addresses), - *self.network.client.server_address_and_port - ) + if self.network.client and self.network.client.server_address_and_port: + log.info("subscribed to %i/%i addresses on %s:%i", len(addresses) - len(addresses_remaining), + len(addresses), *self.network.client.server_address_and_port) + if self.network.client and self.network.client.server_address_and_port: + log.info( + "finished subscribing to %i addresses on %s:%i", len(addresses), + *self.network.client.server_address_and_port + ) def process_status_update(self, update): address, remote_status = update From 853885e2ffba0753aca98d9a539cd6e049b08ee4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 5 Nov 2020 15:02:16 -0500 Subject: [PATCH 7/7] debug --- tests/integration/blockchain/test_claim_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index c80624c06..409264f93 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -89,7 +89,7 @@ class ClaimSearchCommand(ClaimTestCase): # 23829 claim ids makes the request just large enough claim_ids = [ '0000000000000000000000000000000000000000', - ] * 23829 + ] * 33829 with self.assertRaises(ConnectionResetError): await self.claim_search(claim_ids=claim_ids)