Compare commits

...

3 commits

Author SHA1 Message Date
Victor Shyba
92258a2764 add concurrent_hub_requests conf 2021-05-04 22:38:59 -03:00
Victor Shyba
ac88ada50e move request semaphore to session and apply to all requests 2021-05-04 22:38:46 -03:00
Victor Shyba
e7fdf2aeb3 add hub_timeout and propagate it to network code 2021-05-04 22:38:39 -03:00
5 changed files with 44 additions and 19 deletions

View file

@ -544,6 +544,7 @@ class Config(CLIConfig):
# protocol timeouts
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0)
hub_timeout = Float("Timeout when making a hub request", 30.0)
peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0)
node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT)
@ -565,6 +566,7 @@ class Config(CLIConfig):
"Maximum number of peers to connect to while downloading a blob", 4,
previous_names=['max_connections_per_stream']
)
concurrent_hub_requests = Integer("Maximum number of concurrent hub requests", 32)
fixed_peer_delay = Float(
"Amount of seconds before adding the reflector servers as potential peers to download from in case dht"
"peers are not found or are slow", 2.0

View file

@ -182,7 +182,9 @@ class WalletManager:
ledger_config = {
'auto_connect': True,
'hub_timeout': config.hub_timeout,
'default_servers': config.lbryum_servers,
'concurrent_hub_requests': config.concurrent_hub_requests,
'data_path': config.wallet_dir,
'tx_cache_size': config.transaction_cache_size
}
@ -226,6 +228,8 @@ class WalletManager:
self.ledger.config = {
'auto_connect': True,
'default_servers': self.config.lbryum_servers,
'hub_timeout': self.config.hub_timeout,
'concurrent_hub_requests': self.config.concurrent_hub_requests,
'data_path': self.config.wallet_dir,
}
await self.ledger.stop()

View file

@ -19,7 +19,7 @@ log = logging.getLogger(__name__)
class ClientSession(BaseClientSession):
def __init__(self, *args, network: 'Network', server, timeout=30, **kwargs):
def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs):
self.network = network
self.server = server
super().__init__(*args, **kwargs)
@ -29,7 +29,11 @@ class ClientSession(BaseClientSession):
self.response_time: Optional[float] = None
self.connection_latency: Optional[float] = None
self._response_samples = 0
self.pending_amount = 0
self._concurrency = asyncio.Semaphore(concurrency)
@property
def concurrency(self):
return self._concurrency._value
@property
def available(self):
@ -55,9 +59,9 @@ class ClientSession(BaseClientSession):
return result
async def send_request(self, method, args=()):
self.pending_amount += 1
log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout)
try:
await self._concurrency.acquire()
if method == 'server.version':
return await self.send_timed_server_version_request(args, self.timeout)
request = asyncio.ensure_future(super().send_request(method, args))
@ -91,7 +95,7 @@ class ClientSession(BaseClientSession):
# self.synchronous_close()
raise
finally:
self.pending_amount -= 1
self._concurrency.release()
async def ensure_server_version(self, required=None, timeout=3):
required = required or self.network.PROTOCOL_VERSION
@ -154,7 +158,6 @@ class Network:
# self._switch_task: Optional[asyncio.Task] = None
self.running = False
self.remote_height: int = 0
self._concurrency = asyncio.Semaphore(16)
self._on_connected_controller = StreamController()
self.on_connected = self._on_connected_controller.stream
@ -263,7 +266,8 @@ class Network:
async def connect_to_fastest(self) -> Optional[ClientSession]:
fastest_spvs = await self.get_n_fastest_spvs()
for (host, port) in fastest_spvs:
client = ClientSession(network=self, server=(host, port))
client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'],
concurrency=self.config['concurrent_hub_requests'])
try:
await client.create_connection()
log.warning("Connected to spv server %s:%i", host, port)
@ -344,18 +348,17 @@ class Network:
raise ConnectionError("Attempting to send rpc request when connection is not available.")
async def retriable_call(self, function, *args, **kwargs):
async with self._concurrency:
while self.running:
if not self.is_connected:
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
self._urgent_need_reconnect.set()
await self.on_connected.first
try:
return await function(*args, **kwargs)
except asyncio.TimeoutError:
log.warning("Wallet server call timed out, retrying.")
except ConnectionError:
log.warning("connection error")
while self.running:
if not self.is_connected:
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
self._urgent_need_reconnect.set()
await self.on_connected.first
try:
return await function(*args, **kwargs)
except asyncio.TimeoutError:
log.warning("Wallet server call timed out, retrying.")
except ConnectionError:
log.warning("connection error")
raise asyncio.CancelledError() # if we got here, we are shutting down

View file

@ -131,7 +131,9 @@ class WalletNode:
self.ledger_class.get_id(): {
'api_port': self.port,
'default_servers': [(spv_node.hostname, spv_node.port)],
'data_path': self.data_path
'data_path': self.data_path,
'hub_timeout': 30,
'concurrent_hub_requests': 32,
}
},
'wallets': [wallet_file_name]

View file

@ -9,6 +9,7 @@ from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.rpc import RPCSession
from lbry.wallet.server.udp import StatusServer
from lbry.testcase import IntegrationTestCase, AsyncioTestCase
from lbry.conf import Config
class NetworkTests(IntegrationTestCase):
@ -138,6 +139,19 @@ class ReconnectTests(IntegrationTestCase):
await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected)
async def test_timeout_and_concurrency_propagated_from_config(self):
conf = Config()
self.assertEqual(self.ledger.network.client.timeout, 30)
self.assertEqual(self.ledger.network.client.concurrency, 32)
conf.hub_timeout = 123.0
conf.concurrent_hub_requests = 42
conf.lbryum_servers = self.ledger.config['default_servers']
conf.wallet_dir = self.ledger.config['data_path']
self.manager.config = conf
await self.manager.reset()
self.assertEqual(self.ledger.network.client.timeout, 123)
self.assertEqual(self.ledger.network.client.concurrency, 42)
# async def test_online_but_still_unavailable(self):
# # Edge case. See issue #2445 for context
# self.assertIsNotNone(self.ledger.network.session_pool.fastest_session)