Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Victor Shyba
92258a2764 add concurrent_hub_requests conf 2021-05-04 22:38:59 -03:00
Victor Shyba
ac88ada50e move request semaphore to session and apply to all requests 2021-05-04 22:38:46 -03:00
Victor Shyba
e7fdf2aeb3 add hub_timeout and propagate it to network code 2021-05-04 22:38:39 -03:00
5 changed files with 44 additions and 19 deletions

View file

@ -544,6 +544,7 @@ class Config(CLIConfig):
# protocol timeouts # protocol timeouts
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0) blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0)
hub_timeout = Float("Timeout when making a hub request", 30.0)
peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0)
node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT) node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT)
@ -565,6 +566,7 @@ class Config(CLIConfig):
"Maximum number of peers to connect to while downloading a blob", 4, "Maximum number of peers to connect to while downloading a blob", 4,
previous_names=['max_connections_per_stream'] previous_names=['max_connections_per_stream']
) )
concurrent_hub_requests = Integer("Maximum number of concurrent hub requests", 32)
fixed_peer_delay = Float( fixed_peer_delay = Float(
"Amount of seconds before adding the reflector servers as potential peers to download from in case dht" "Amount of seconds before adding the reflector servers as potential peers to download from in case dht"
"peers are not found or are slow", 2.0 "peers are not found or are slow", 2.0

View file

@ -182,7 +182,9 @@ class WalletManager:
ledger_config = { ledger_config = {
'auto_connect': True, 'auto_connect': True,
'hub_timeout': config.hub_timeout,
'default_servers': config.lbryum_servers, 'default_servers': config.lbryum_servers,
'concurrent_hub_requests': config.concurrent_hub_requests,
'data_path': config.wallet_dir, 'data_path': config.wallet_dir,
'tx_cache_size': config.transaction_cache_size 'tx_cache_size': config.transaction_cache_size
} }
@ -226,6 +228,8 @@ class WalletManager:
self.ledger.config = { self.ledger.config = {
'auto_connect': True, 'auto_connect': True,
'default_servers': self.config.lbryum_servers, 'default_servers': self.config.lbryum_servers,
'hub_timeout': self.config.hub_timeout,
'concurrent_hub_requests': self.config.concurrent_hub_requests,
'data_path': self.config.wallet_dir, 'data_path': self.config.wallet_dir,
} }
await self.ledger.stop() await self.ledger.stop()

View file

@ -19,7 +19,7 @@ log = logging.getLogger(__name__)
class ClientSession(BaseClientSession): class ClientSession(BaseClientSession):
def __init__(self, *args, network: 'Network', server, timeout=30, **kwargs): def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs):
self.network = network self.network = network
self.server = server self.server = server
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -29,7 +29,11 @@ class ClientSession(BaseClientSession):
self.response_time: Optional[float] = None self.response_time: Optional[float] = None
self.connection_latency: Optional[float] = None self.connection_latency: Optional[float] = None
self._response_samples = 0 self._response_samples = 0
self.pending_amount = 0 self._concurrency = asyncio.Semaphore(concurrency)
@property
def concurrency(self):
return self._concurrency._value
@property @property
def available(self): def available(self):
@ -55,9 +59,9 @@ class ClientSession(BaseClientSession):
return result return result
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
self.pending_amount += 1
log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout) log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout)
try: try:
await self._concurrency.acquire()
if method == 'server.version': if method == 'server.version':
return await self.send_timed_server_version_request(args, self.timeout) return await self.send_timed_server_version_request(args, self.timeout)
request = asyncio.ensure_future(super().send_request(method, args)) request = asyncio.ensure_future(super().send_request(method, args))
@ -91,7 +95,7 @@ class ClientSession(BaseClientSession):
# self.synchronous_close() # self.synchronous_close()
raise raise
finally: finally:
self.pending_amount -= 1 self._concurrency.release()
async def ensure_server_version(self, required=None, timeout=3): async def ensure_server_version(self, required=None, timeout=3):
required = required or self.network.PROTOCOL_VERSION required = required or self.network.PROTOCOL_VERSION
@ -154,7 +158,6 @@ class Network:
# self._switch_task: Optional[asyncio.Task] = None # self._switch_task: Optional[asyncio.Task] = None
self.running = False self.running = False
self.remote_height: int = 0 self.remote_height: int = 0
self._concurrency = asyncio.Semaphore(16)
self._on_connected_controller = StreamController() self._on_connected_controller = StreamController()
self.on_connected = self._on_connected_controller.stream self.on_connected = self._on_connected_controller.stream
@ -263,7 +266,8 @@ class Network:
async def connect_to_fastest(self) -> Optional[ClientSession]: async def connect_to_fastest(self) -> Optional[ClientSession]:
fastest_spvs = await self.get_n_fastest_spvs() fastest_spvs = await self.get_n_fastest_spvs()
for (host, port) in fastest_spvs: for (host, port) in fastest_spvs:
client = ClientSession(network=self, server=(host, port)) client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'],
concurrency=self.config['concurrent_hub_requests'])
try: try:
await client.create_connection() await client.create_connection()
log.warning("Connected to spv server %s:%i", host, port) log.warning("Connected to spv server %s:%i", host, port)
@ -344,18 +348,17 @@ class Network:
raise ConnectionError("Attempting to send rpc request when connection is not available.") raise ConnectionError("Attempting to send rpc request when connection is not available.")
async def retriable_call(self, function, *args, **kwargs): async def retriable_call(self, function, *args, **kwargs):
async with self._concurrency: while self.running:
while self.running: if not self.is_connected:
if not self.is_connected: log.warning("Wallet server unavailable, waiting for it to come back and retry.")
log.warning("Wallet server unavailable, waiting for it to come back and retry.") self._urgent_need_reconnect.set()
self._urgent_need_reconnect.set() await self.on_connected.first
await self.on_connected.first try:
try: return await function(*args, **kwargs)
return await function(*args, **kwargs) except asyncio.TimeoutError:
except asyncio.TimeoutError: log.warning("Wallet server call timed out, retrying.")
log.warning("Wallet server call timed out, retrying.") except ConnectionError:
except ConnectionError: log.warning("connection error")
log.warning("connection error")
raise asyncio.CancelledError() # if we got here, we are shutting down raise asyncio.CancelledError() # if we got here, we are shutting down

View file

@ -131,7 +131,9 @@ class WalletNode:
self.ledger_class.get_id(): { self.ledger_class.get_id(): {
'api_port': self.port, 'api_port': self.port,
'default_servers': [(spv_node.hostname, spv_node.port)], 'default_servers': [(spv_node.hostname, spv_node.port)],
'data_path': self.data_path 'data_path': self.data_path,
'hub_timeout': 30,
'concurrent_hub_requests': 32,
} }
}, },
'wallets': [wallet_file_name] 'wallets': [wallet_file_name]

View file

@ -9,6 +9,7 @@ from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.rpc import RPCSession from lbry.wallet.rpc import RPCSession
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
from lbry.testcase import IntegrationTestCase, AsyncioTestCase from lbry.testcase import IntegrationTestCase, AsyncioTestCase
from lbry.conf import Config
class NetworkTests(IntegrationTestCase): class NetworkTests(IntegrationTestCase):
@ -138,6 +139,19 @@ class ReconnectTests(IntegrationTestCase):
await self.ledger.network.on_connected.first await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected)
async def test_timeout_and_concurrency_propagated_from_config(self):
conf = Config()
self.assertEqual(self.ledger.network.client.timeout, 30)
self.assertEqual(self.ledger.network.client.concurrency, 32)
conf.hub_timeout = 123.0
conf.concurrent_hub_requests = 42
conf.lbryum_servers = self.ledger.config['default_servers']
conf.wallet_dir = self.ledger.config['data_path']
self.manager.config = conf
await self.manager.reset()
self.assertEqual(self.ledger.network.client.timeout, 123)
self.assertEqual(self.ledger.network.client.concurrency, 42)
# async def test_online_but_still_unavailable(self): # async def test_online_but_still_unavailable(self):
# # Edge case. See issue #2445 for context # # Edge case. See issue #2445 for context
# self.assertIsNotNone(self.ledger.network.session_pool.fastest_session) # self.assertIsNotNone(self.ledger.network.session_pool.fastest_session)