From 91323a21cfa509a4f0f4ea0dcfd658c390a7e4f8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 13 Mar 2021 02:31:03 -0300 Subject: [PATCH 1/3] add hub_timeout and propagate it to network code --- lbry/conf.py | 1 + lbry/wallet/__init__.pyc | Bin 0 -> 1268 bytes lbry/wallet/manager.py | 2 ++ lbry/wallet/network.py | 1 + lbry/wallet/orchstr8/node.py | 1 + tests/integration/blockchain/test_network.py | 10 ++++++++++ 6 files changed, 15 insertions(+) create mode 100644 lbry/wallet/__init__.pyc diff --git a/lbry/conf.py b/lbry/conf.py index 96b4b4389..5edbd4108 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -628,6 +628,7 @@ class Config(CLIConfig): # protocol timeouts download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0) + hub_timeout = Float("Timeout when making a hub request", 30.0) peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT) diff --git a/lbry/wallet/__init__.pyc b/lbry/wallet/__init__.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d4854844fe6b60a10a9b0972d81a6424f412ff41 GIT binary patch literal 1268 zcmZ8g+in{-5IyomzRI`wQahJAP9OB*RRaVGPy|JrIDmsVPC^MN%9q7@$x1}XU4cv4 zaeuOZ(I3cjfzFUh>_o)Qa?ad#&WvV1cb5PC=fAwf^fyEA-{@)30u&%gwjf(jGmtY- zvyih;bC7dT^H6QrdAK%Y8*Tw|0d5g;5pD@`2`%CFfYp_Rfk02kBzK(u(gO~`5BkUm9O)j#sc0CR8*BC6>%X+uRY|})nO>d06o0-SUCw$d z%iE%&BtF3lWINa4s`ejm3ZH334l!iPr$OzaC=mmg2H8IHoMOI?MOh%^6Q zmi43xVU6!fdEGSA>S2kXzNv;*ztPuWpR`k>ec{WIDGPg|gZ9o;hDa&9vt$iCuz@eC zrYM7{tH{v1Nf=K;m?zWgif&!r`mjnOS^A|n;}AomJ?Y7d^YuTRnV{dAPo|37aZ-fh zf<}A8&8aSw_RV?sC(g|5=6O{sovT8-y2|rdZHmzzeG6Qe?>r{nk_S&T&ztdOS_49O z+-Wrgw;;GMwG&aGm5vbDG1RG-l8^~mdmRWG0(Z5K;5KNEYPY^s7 z!97KW!VS5C$0{~iY_XYZHjztlhrw+IxsI9=CUzL+y^Ox2$eP$Ay-Uw6(J;pUr(SBe qm*+I)kmuR>I%a(CYBjO?S6 Date: Sat, 13 Mar 2021 02:39:40 -0300 Subject: [PATCH 2/3] move request semaphore to session and apply to all requests --- lbry/wallet/network.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 955cbcb22..56416ed1b 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -30,7 +30,7 @@ class ClientSession(BaseClientSession): self.response_time: Optional[float] = None self.connection_latency: Optional[float] = None self._response_samples = 0 - self.pending_amount = 0 + self._concurrency = asyncio.Semaphore(16) @property def available(self): @@ -56,9 +56,9 @@ class ClientSession(BaseClientSession): return result async def send_request(self, method, args=()): - self.pending_amount += 1 log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout) try: + await self._concurrency.acquire() if method == 'server.version': return await self.send_timed_server_version_request(args, self.timeout) request = asyncio.ensure_future(super().send_request(method, args)) @@ -92,7 +92,7 @@ class ClientSession(BaseClientSession): # self.synchronous_close() raise finally: - self.pending_amount -= 1 + self._concurrency.release() async def ensure_server_version(self, required=None, timeout=3): required = required or self.network.PROTOCOL_VERSION @@ -155,7 +155,6 @@ class Network: # self._switch_task: Optional[asyncio.Task] = None self.running = False self.remote_height: int = 0 - self._concurrency = asyncio.Semaphore(16) self._on_connected_controller = StreamController() self.on_connected = self._on_connected_controller.stream @@ -377,18 +376,17 @@ class Network: raise ConnectionError("Attempting to send rpc request when connection is not available.") async def retriable_call(self, function, *args, **kwargs): - async with self._concurrency: - while self.running: - if not self.is_connected: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") - self._urgent_need_reconnect.set() - await self.on_connected.first - try: - return await function(*args, **kwargs) - except asyncio.TimeoutError: - log.warning("Wallet server call timed out, retrying.") - except ConnectionError: - log.warning("connection error") + while self.running: + if not self.is_connected: + log.warning("Wallet server unavailable, waiting for it to come back and retry.") + self._urgent_need_reconnect.set() + await self.on_connected.first + try: + return await function(*args, **kwargs) + except asyncio.TimeoutError: + log.warning("Wallet server call timed out, retrying.") + except ConnectionError: + log.warning("connection error") raise asyncio.CancelledError() # if we got here, we are shutting down From 309e957a854329f087f31281aa5fc74184d8b0ce Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 13 Mar 2021 02:49:33 -0300 Subject: [PATCH 3/3] add concurrent_hub_requests conf --- lbry/conf.py | 1 + lbry/wallet/manager.py | 2 ++ lbry/wallet/network.py | 12 ++++++++---- lbry/wallet/orchstr8/node.py | 3 ++- tests/integration/blockchain/test_network.py | 8 ++++++-- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/lbry/conf.py b/lbry/conf.py index 5edbd4108..4250fe579 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -650,6 +650,7 @@ class Config(CLIConfig): "Maximum number of peers to connect to while downloading a blob", 4, previous_names=['max_connections_per_stream'] ) + concurrent_hub_requests = Integer("Maximum number of concurrent hub requests", 32) fixed_peer_delay = Float( "Amount of seconds before adding the reflector servers as potential peers to download from in case dht" "peers are not found or are slow", 2.0 diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 1904f303a..6228159b6 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -187,6 +187,7 @@ class WalletManager: 'default_servers': config.lbryum_servers, 'known_hubs': config.known_hubs, 'jurisdiction': config.jurisdiction, + 'concurrent_hub_requests': config.concurrent_hub_requests, 'data_path': config.wallet_dir, 'tx_cache_size': config.transaction_cache_size } @@ -238,6 +239,7 @@ class WalletManager: 'known_hubs': self.config.known_hubs, 'jurisdiction': self.config.jurisdiction, 'hub_timeout': self.config.hub_timeout, + 'concurrent_hub_requests': self.config.concurrent_hub_requests, 'data_path': self.config.wallet_dir, } if Config.lbryum_servers.is_set(self.config): diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 56416ed1b..a7305e1a2 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -20,7 +20,7 @@ log = logging.getLogger(__name__) class ClientSession(BaseClientSession): - def __init__(self, *args, network: 'Network', server, timeout=30, **kwargs): + def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs): self.network = network self.server = server super().__init__(*args, **kwargs) @@ -30,7 +30,11 @@ class ClientSession(BaseClientSession): self.response_time: Optional[float] = None self.connection_latency: Optional[float] = None self._response_samples = 0 - self._concurrency = asyncio.Semaphore(16) + self._concurrency = asyncio.Semaphore(concurrency) + + @property + def concurrency(self): + return self._concurrency._value @property def available(self): @@ -292,8 +296,8 @@ class Network: if (pong is not None and self.jurisdiction is not None) and \ (pong.country_name != self.jurisdiction): continue - client = ClientSession(network=self, server=(host, port)) - client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout']) + client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'], + concurrency=self.config['concurrent_hub_requests']) try: await client.create_connection() log.warning("Connected to spv server %s:%i", host, port) diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 9ae8cfc8d..5209d6508 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -135,8 +135,9 @@ class WalletNode: 'explicit_servers': [(spv_node.hostname, spv_node.port)], 'default_servers': Config.lbryum_servers.default, 'data_path': self.data_path, - 'known_hubs': config.known_hubs if config else KnownHubsList() + 'known_hubs': config.known_hubs if config else KnownHubsList(), 'hub_timeout': 30, + 'concurrent_hub_requests': 32, } }, 'wallets': [wallet_file_name] diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 886c234a7..9447dc835 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -139,14 +139,18 @@ class ReconnectTests(IntegrationTestCase): await self.ledger.network.on_connected.first self.assertTrue(self.ledger.network.is_connected) - async def test_timeout_propagated_from_config(self): + async def test_timeout_and_concurrency_propagated_from_config(self): conf = Config() self.assertEqual(self.ledger.network.client.timeout, 30) + self.assertEqual(self.ledger.network.client.concurrency, 32) conf.hub_timeout = 123.0 - conf.lbryum_servers = self.ledger.config['default_servers'] + conf.concurrent_hub_requests = 42 + conf.known_hubs = self.ledger.config['known_hubs'] + conf.wallet_dir = self.ledger.config['data_path'] self.manager.config = conf await self.manager.reset() self.assertEqual(self.ledger.network.client.timeout, 123) + self.assertEqual(self.ledger.network.client.concurrency, 42) # async def test_online_but_still_unavailable(self): # # Edge case. See issue #2445 for context