Merge pull request #3232 from lbryio/timeout

This commit is contained in:
Alex Grin 2021-06-25 11:05:25 -04:00 committed by GitHub
commit 115034fccb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 19 deletions

View file

@ -628,6 +628,7 @@ class Config(CLIConfig):
# protocol timeouts # protocol timeouts
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0) blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0)
hub_timeout = Float("Timeout when making a hub request", 30.0)
peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0)
node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT) node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT)
@ -649,6 +650,7 @@ class Config(CLIConfig):
"Maximum number of peers to connect to while downloading a blob", 4, "Maximum number of peers to connect to while downloading a blob", 4,
previous_names=['max_connections_per_stream'] previous_names=['max_connections_per_stream']
) )
concurrent_hub_requests = Integer("Maximum number of concurrent hub requests", 32)
fixed_peer_delay = Float( fixed_peer_delay = Float(
"Amount of seconds before adding the reflector servers as potential peers to download from in case dht" "Amount of seconds before adding the reflector servers as potential peers to download from in case dht"
"peers are not found or are slow", 2.0 "peers are not found or are slow", 2.0

BIN
lbry/wallet/__init__.pyc Normal file

Binary file not shown.

View file

@ -183,9 +183,11 @@ class WalletManager:
ledger_config = { ledger_config = {
'auto_connect': True, 'auto_connect': True,
'explicit_servers': [], 'explicit_servers': [],
'hub_timeout': config.hub_timeout,
'default_servers': config.lbryum_servers, 'default_servers': config.lbryum_servers,
'known_hubs': config.known_hubs, 'known_hubs': config.known_hubs,
'jurisdiction': config.jurisdiction, 'jurisdiction': config.jurisdiction,
'concurrent_hub_requests': config.concurrent_hub_requests,
'data_path': config.wallet_dir, 'data_path': config.wallet_dir,
'tx_cache_size': config.transaction_cache_size 'tx_cache_size': config.transaction_cache_size
} }
@ -236,6 +238,8 @@ class WalletManager:
'default_servers': Config.lbryum_servers.default, 'default_servers': Config.lbryum_servers.default,
'known_hubs': self.config.known_hubs, 'known_hubs': self.config.known_hubs,
'jurisdiction': self.config.jurisdiction, 'jurisdiction': self.config.jurisdiction,
'hub_timeout': self.config.hub_timeout,
'concurrent_hub_requests': self.config.concurrent_hub_requests,
'data_path': self.config.wallet_dir, 'data_path': self.config.wallet_dir,
} }
if Config.lbryum_servers.is_set(self.config): if Config.lbryum_servers.is_set(self.config):

View file

@ -20,7 +20,7 @@ log = logging.getLogger(__name__)
class ClientSession(BaseClientSession): class ClientSession(BaseClientSession):
def __init__(self, *args, network: 'Network', server, timeout=30, **kwargs): def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs):
self.network = network self.network = network
self.server = server self.server = server
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -30,7 +30,11 @@ class ClientSession(BaseClientSession):
self.response_time: Optional[float] = None self.response_time: Optional[float] = None
self.connection_latency: Optional[float] = None self.connection_latency: Optional[float] = None
self._response_samples = 0 self._response_samples = 0
self.pending_amount = 0 self._concurrency = asyncio.Semaphore(concurrency)
@property
def concurrency(self):
return self._concurrency._value
@property @property
def available(self): def available(self):
@ -56,9 +60,9 @@ class ClientSession(BaseClientSession):
return result return result
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
self.pending_amount += 1
log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout) log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout)
try: try:
await self._concurrency.acquire()
if method == 'server.version': if method == 'server.version':
return await self.send_timed_server_version_request(args, self.timeout) return await self.send_timed_server_version_request(args, self.timeout)
request = asyncio.ensure_future(super().send_request(method, args)) request = asyncio.ensure_future(super().send_request(method, args))
@ -92,7 +96,7 @@ class ClientSession(BaseClientSession):
# self.synchronous_close() # self.synchronous_close()
raise raise
finally: finally:
self.pending_amount -= 1 self._concurrency.release()
async def ensure_server_version(self, required=None, timeout=3): async def ensure_server_version(self, required=None, timeout=3):
required = required or self.network.PROTOCOL_VERSION required = required or self.network.PROTOCOL_VERSION
@ -155,7 +159,6 @@ class Network:
# self._switch_task: Optional[asyncio.Task] = None # self._switch_task: Optional[asyncio.Task] = None
self.running = False self.running = False
self.remote_height: int = 0 self.remote_height: int = 0
self._concurrency = asyncio.Semaphore(16)
self._on_connected_controller = StreamController() self._on_connected_controller = StreamController()
self.on_connected = self._on_connected_controller.stream self.on_connected = self._on_connected_controller.stream
@ -293,7 +296,8 @@ class Network:
if (pong is not None and self.jurisdiction is not None) and \ if (pong is not None and self.jurisdiction is not None) and \
(pong.country_name != self.jurisdiction): (pong.country_name != self.jurisdiction):
continue continue
client = ClientSession(network=self, server=(host, port)) client = ClientSession(network=self, server=(host, port), timeout=self.config['hub_timeout'],
concurrency=self.config['concurrent_hub_requests'])
try: try:
await client.create_connection() await client.create_connection()
log.warning("Connected to spv server %s:%i", host, port) log.warning("Connected to spv server %s:%i", host, port)
@ -376,18 +380,17 @@ class Network:
raise ConnectionError("Attempting to send rpc request when connection is not available.") raise ConnectionError("Attempting to send rpc request when connection is not available.")
async def retriable_call(self, function, *args, **kwargs): async def retriable_call(self, function, *args, **kwargs):
async with self._concurrency: while self.running:
while self.running: if not self.is_connected:
if not self.is_connected: log.warning("Wallet server unavailable, waiting for it to come back and retry.")
log.warning("Wallet server unavailable, waiting for it to come back and retry.") self._urgent_need_reconnect.set()
self._urgent_need_reconnect.set() await self.on_connected.first
await self.on_connected.first try:
try: return await function(*args, **kwargs)
return await function(*args, **kwargs) except asyncio.TimeoutError:
except asyncio.TimeoutError: log.warning("Wallet server call timed out, retrying.")
log.warning("Wallet server call timed out, retrying.") except ConnectionError:
except ConnectionError: log.warning("connection error")
log.warning("connection error")
raise asyncio.CancelledError() # if we got here, we are shutting down raise asyncio.CancelledError() # if we got here, we are shutting down

View file

@ -135,7 +135,9 @@ class WalletNode:
'explicit_servers': [(spv_node.hostname, spv_node.port)], 'explicit_servers': [(spv_node.hostname, spv_node.port)],
'default_servers': Config.lbryum_servers.default, 'default_servers': Config.lbryum_servers.default,
'data_path': self.data_path, 'data_path': self.data_path,
'known_hubs': config.known_hubs if config else KnownHubsList() 'known_hubs': config.known_hubs if config else KnownHubsList(),
'hub_timeout': 30,
'concurrent_hub_requests': 32,
} }
}, },
'wallets': [wallet_file_name] 'wallets': [wallet_file_name]

View file

@ -9,6 +9,7 @@ from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.rpc import RPCSession from lbry.wallet.rpc import RPCSession
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
from lbry.testcase import IntegrationTestCase, AsyncioTestCase from lbry.testcase import IntegrationTestCase, AsyncioTestCase
from lbry.conf import Config
class NetworkTests(IntegrationTestCase): class NetworkTests(IntegrationTestCase):
@ -138,6 +139,19 @@ class ReconnectTests(IntegrationTestCase):
await self.ledger.network.on_connected.first await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected)
async def test_timeout_and_concurrency_propagated_from_config(self):
conf = Config()
self.assertEqual(self.ledger.network.client.timeout, 30)
self.assertEqual(self.ledger.network.client.concurrency, 32)
conf.hub_timeout = 123.0
conf.concurrent_hub_requests = 42
conf.known_hubs = self.ledger.config['known_hubs']
conf.wallet_dir = self.ledger.config['data_path']
self.manager.config = conf
await self.manager.reset()
self.assertEqual(self.ledger.network.client.timeout, 123)
self.assertEqual(self.ledger.network.client.concurrency, 42)
# async def test_online_but_still_unavailable(self): # async def test_online_but_still_unavailable(self):
# # Edge case. See issue #2445 for context # # Edge case. See issue #2445 for context
# self.assertIsNotNone(self.ledger.network.session_pool.fastest_session) # self.assertIsNotNone(self.ledger.network.session_pool.fastest_session)