add concurrent_hub_requests conf

This commit is contained in:
Victor Shyba 2021-03-13 02:49:33 -03:00
parent ac88ada50e
commit 92258a2764
5 changed files with 17 additions and 4 deletions

View file

@ -566,6 +566,7 @@ class Config(CLIConfig):
"Maximum number of peers to connect to while downloading a blob", 4, "Maximum number of peers to connect to while downloading a blob", 4,
previous_names=['max_connections_per_stream'] previous_names=['max_connections_per_stream']
) )
concurrent_hub_requests = Integer("Maximum number of concurrent hub requests", 32)
fixed_peer_delay = Float( fixed_peer_delay = Float(
"Amount of seconds before adding the reflector servers as potential peers to download from in case dht" "Amount of seconds before adding the reflector servers as potential peers to download from in case dht"
"peers are not found or are slow", 2.0 "peers are not found or are slow", 2.0

View file

@ -184,6 +184,7 @@ class WalletManager:
'auto_connect': True, 'auto_connect': True,
'hub_timeout': config.hub_timeout, 'hub_timeout': config.hub_timeout,
'default_servers': config.lbryum_servers, 'default_servers': config.lbryum_servers,
'concurrent_hub_requests': config.concurrent_hub_requests,
'data_path': config.wallet_dir, 'data_path': config.wallet_dir,
'tx_cache_size': config.transaction_cache_size 'tx_cache_size': config.transaction_cache_size
} }
@ -228,6 +229,7 @@ class WalletManager:
'auto_connect': True, 'auto_connect': True,
'default_servers': self.config.lbryum_servers, 'default_servers': self.config.lbryum_servers,
'hub_timeout': self.config.hub_timeout, 'hub_timeout': self.config.hub_timeout,
'concurrent_hub_requests': self.config.concurrent_hub_requests,
'data_path': self.config.wallet_dir, 'data_path': self.config.wallet_dir,
} }
await self.ledger.stop() await self.ledger.stop()

View file

@ -19,7 +19,7 @@ log = logging.getLogger(__name__)
class ClientSession(BaseClientSession): class ClientSession(BaseClientSession):
def __init__(self, *args, network: 'Network', server, timeout=30, **kwargs): def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs):
self.network = network self.network = network
self.server = server self.server = server
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -29,7 +29,11 @@ class ClientSession(BaseClientSession):
self.response_time: Optional[float] = None self.response_time: Optional[float] = None
self.connection_latency: Optional[float] = None self.connection_latency: Optional[float] = None
self._response_samples = 0 self._response_samples = 0
self._concurrency = asyncio.Semaphore(16) self._concurrency = asyncio.Semaphore(concurrency)
@property
def concurrency(self):
return self._concurrency._value
@property @property
def available(self): def available(self):
@ -262,7 +266,8 @@ class Network:
async def connect_to_fastest(self) -> Optional[ClientSession]: async def connect_to_fastest(self) -> Optional[ClientSession]:
fastest_spvs = await self.get_n_fastest_spvs() fastest_spvs = await self.get_n_fastest_spvs()
for (host, port) in fastest_spvs: for (host, port) in fastest_spvs:
client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout']) client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'],
concurrency=self.config['concurrent_hub_requests'])
try: try:
await client.create_connection() await client.create_connection()
log.warning("Connected to spv server %s:%i", host, port) log.warning("Connected to spv server %s:%i", host, port)

View file

@ -133,6 +133,7 @@ class WalletNode:
'default_servers': [(spv_node.hostname, spv_node.port)], 'default_servers': [(spv_node.hostname, spv_node.port)],
'data_path': self.data_path, 'data_path': self.data_path,
'hub_timeout': 30, 'hub_timeout': 30,
'concurrent_hub_requests': 32,
} }
}, },
'wallets': [wallet_file_name] 'wallets': [wallet_file_name]

View file

@ -139,14 +139,18 @@ class ReconnectTests(IntegrationTestCase):
await self.ledger.network.on_connected.first await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected)
async def test_timeout_propagated_from_config(self): async def test_timeout_and_concurrency_propagated_from_config(self):
conf = Config() conf = Config()
self.assertEqual(self.ledger.network.client.timeout, 30) self.assertEqual(self.ledger.network.client.timeout, 30)
self.assertEqual(self.ledger.network.client.concurrency, 32)
conf.hub_timeout = 123.0 conf.hub_timeout = 123.0
conf.concurrent_hub_requests = 42
conf.lbryum_servers = self.ledger.config['default_servers'] conf.lbryum_servers = self.ledger.config['default_servers']
conf.wallet_dir = self.ledger.config['data_path']
self.manager.config = conf self.manager.config = conf
await self.manager.reset() await self.manager.reset()
self.assertEqual(self.ledger.network.client.timeout, 123) self.assertEqual(self.ledger.network.client.timeout, 123)
self.assertEqual(self.ledger.network.client.concurrency, 42)
# async def test_online_but_still_unavailable(self): # async def test_online_but_still_unavailable(self):
# # Edge case. See issue #2445 for context # # Edge case. See issue #2445 for context