add concurrent_hub_requests conf

This commit is contained in:
Victor Shyba 2021-03-13 02:49:33 -03:00
parent d7007e402e
commit 309e957a85
5 changed files with 19 additions and 7 deletions

View file

@ -650,6 +650,7 @@ class Config(CLIConfig):
"Maximum number of peers to connect to while downloading a blob", 4,
previous_names=['max_connections_per_stream']
)
concurrent_hub_requests = Integer("Maximum number of concurrent hub requests", 32)
fixed_peer_delay = Float(
"Amount of seconds before adding the reflector servers as potential peers to download from in case dht"
"peers are not found or are slow", 2.0

View file

@ -187,6 +187,7 @@ class WalletManager:
'default_servers': config.lbryum_servers,
'known_hubs': config.known_hubs,
'jurisdiction': config.jurisdiction,
'concurrent_hub_requests': config.concurrent_hub_requests,
'data_path': config.wallet_dir,
'tx_cache_size': config.transaction_cache_size
}
@ -238,6 +239,7 @@ class WalletManager:
'known_hubs': self.config.known_hubs,
'jurisdiction': self.config.jurisdiction,
'hub_timeout': self.config.hub_timeout,
'concurrent_hub_requests': self.config.concurrent_hub_requests,
'data_path': self.config.wallet_dir,
}
if Config.lbryum_servers.is_set(self.config):

View file

@ -20,7 +20,7 @@ log = logging.getLogger(__name__)
class ClientSession(BaseClientSession):
def __init__(self, *args, network: 'Network', server, timeout=30, **kwargs):
def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs):
self.network = network
self.server = server
super().__init__(*args, **kwargs)
@ -30,7 +30,11 @@ class ClientSession(BaseClientSession):
self.response_time: Optional[float] = None
self.connection_latency: Optional[float] = None
self._response_samples = 0
self._concurrency = asyncio.Semaphore(16)
self._concurrency = asyncio.Semaphore(concurrency)
@property
def concurrency(self):
return self._concurrency._value
@property
def available(self):
@ -292,8 +296,8 @@ class Network:
if (pong is not None and self.jurisdiction is not None) and \
(pong.country_name != self.jurisdiction):
continue
client = ClientSession(network=self, server=(host, port))
client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'])
client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'],
concurrency=self.config['concurrent_hub_requests'])
try:
await client.create_connection()
log.warning("Connected to spv server %s:%i", host, port)

View file

@ -135,8 +135,9 @@ class WalletNode:
'explicit_servers': [(spv_node.hostname, spv_node.port)],
'default_servers': Config.lbryum_servers.default,
'data_path': self.data_path,
'known_hubs': config.known_hubs if config else KnownHubsList()
'known_hubs': config.known_hubs if config else KnownHubsList(),
'hub_timeout': 30,
'concurrent_hub_requests': 32,
}
},
'wallets': [wallet_file_name]

View file

@ -139,14 +139,18 @@ class ReconnectTests(IntegrationTestCase):
await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected)
async def test_timeout_propagated_from_config(self):
async def test_timeout_and_concurrency_propagated_from_config(self):
conf = Config()
self.assertEqual(self.ledger.network.client.timeout, 30)
self.assertEqual(self.ledger.network.client.concurrency, 32)
conf.hub_timeout = 123.0
conf.lbryum_servers = self.ledger.config['default_servers']
conf.concurrent_hub_requests = 42
conf.known_hubs = self.ledger.config['known_hubs']
conf.wallet_dir = self.ledger.config['data_path']
self.manager.config = conf
await self.manager.reset()
self.assertEqual(self.ledger.network.client.timeout, 123)
self.assertEqual(self.ledger.network.client.concurrency, 42)
# async def test_online_but_still_unavailable(self):
# # Edge case. See issue #2445 for context