diff --git a/lbry/conf.py b/lbry/conf.py index ed6295780..87d981cb2 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -566,6 +566,7 @@ class Config(CLIConfig): "Maximum number of peers to connect to while downloading a blob", 4, previous_names=['max_connections_per_stream'] ) + concurrent_hub_requests = Integer("Maximum number of concurrent hub requests", 32) fixed_peer_delay = Float( "Amount of seconds before adding the reflector servers as potential peers to download from in case dht" "peers are not found or are slow", 2.0 diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 7b06f5f95..1cec6ad74 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -184,6 +184,7 @@ class WalletManager: 'auto_connect': True, 'hub_timeout': config.hub_timeout, 'default_servers': config.lbryum_servers, + 'concurrent_hub_requests': config.concurrent_hub_requests, 'data_path': config.wallet_dir, 'tx_cache_size': config.transaction_cache_size } @@ -228,6 +229,7 @@ class WalletManager: 'auto_connect': True, 'default_servers': self.config.lbryum_servers, 'hub_timeout': self.config.hub_timeout, + 'concurrent_hub_requests': self.config.concurrent_hub_requests, 'data_path': self.config.wallet_dir, } await self.ledger.stop() diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 4fbb970c7..c09482514 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -19,7 +19,7 @@ log = logging.getLogger(__name__) class ClientSession(BaseClientSession): - def __init__(self, *args, network: 'Network', server, timeout=30, **kwargs): + def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs): self.network = network self.server = server super().__init__(*args, **kwargs) @@ -29,7 +29,11 @@ class ClientSession(BaseClientSession): self.response_time: Optional[float] = None self.connection_latency: Optional[float] = None self._response_samples = 0 - self._concurrency = asyncio.Semaphore(16) + self._concurrency = asyncio.Semaphore(concurrency) + + @property + def concurrency(self): + return self._concurrency._value @property def available(self): @@ -262,7 +266,8 @@ class Network: async def connect_to_fastest(self) -> Optional[ClientSession]: fastest_spvs = await self.get_n_fastest_spvs() for (host, port) in fastest_spvs: - client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout']) + client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'], + concurrency=self.config['concurrent_hub_requests']) try: await client.create_connection() log.warning("Connected to spv server %s:%i", host, port) diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 252da873c..3e5862063 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -133,6 +133,7 @@ class WalletNode: 'default_servers': [(spv_node.hostname, spv_node.port)], 'data_path': self.data_path, 'hub_timeout': 30, + 'concurrent_hub_requests': 32, } }, 'wallets': [wallet_file_name] diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 4f89930fd..781094ced 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -139,14 +139,18 @@ class ReconnectTests(IntegrationTestCase): await self.ledger.network.on_connected.first self.assertTrue(self.ledger.network.is_connected) - async def test_timeout_propagated_from_config(self): + async def test_timeout_and_concurrency_propagated_from_config(self): conf = Config() self.assertEqual(self.ledger.network.client.timeout, 30) + self.assertEqual(self.ledger.network.client.concurrency, 32) conf.hub_timeout = 123.0 + conf.concurrent_hub_requests = 42 conf.lbryum_servers = self.ledger.config['default_servers'] + conf.wallet_dir = self.ledger.config['data_path'] self.manager.config = conf await self.manager.reset() self.assertEqual(self.ledger.network.client.timeout, 123) + self.assertEqual(self.ledger.network.client.concurrency, 42) # async def test_online_but_still_unavailable(self): # # Edge case. See issue #2445 for context