forked from LBRYCommunity/lbry-sdk
dont timeout if data being transferred
This commit is contained in:
parent
0292fd8d91
commit
6b3c4c70d2
2 changed files with 27 additions and 27 deletions
|
@ -29,8 +29,6 @@ class ClientSession(BaseClientSession):
|
|||
self.pending_amount = 0
|
||||
self._on_connect_cb = on_connect_callback or (lambda: None)
|
||||
self.trigger_urgent_reconnect = asyncio.Event()
|
||||
# one request per second of timeout, conservative default
|
||||
self._semaphore = asyncio.Semaphore(self.timeout * 2)
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
|
@ -57,20 +55,21 @@ class ClientSession(BaseClientSession):
|
|||
|
||||
async def send_request(self, method, args=()):
|
||||
self.pending_amount += 1
|
||||
async with self._semaphore:
|
||||
return await self._send_request(method, args)
|
||||
|
||||
async def _send_request(self, method, args=()):
|
||||
log.debug("send %s to %s:%i", method, *self.server)
|
||||
try:
|
||||
if method == 'server.version':
|
||||
reply = await self.send_timed_server_version_request(args, self.timeout)
|
||||
else:
|
||||
reply = await asyncio.wait_for(
|
||||
super().send_request(method, args), timeout=self.timeout
|
||||
)
|
||||
log.debug("got reply for %s from %s:%i", method, *self.server)
|
||||
return reply
|
||||
return await self.send_timed_server_version_request(args, self.timeout)
|
||||
request = asyncio.ensure_future(super().send_request(method, args))
|
||||
while not request.done():
|
||||
done, pending = await asyncio.wait([request], timeout=self.timeout)
|
||||
if pending:
|
||||
log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received)
|
||||
if (perf_counter() - self.last_packet_received) < self.timeout:
|
||||
continue
|
||||
log.info("timeout sending %s to %s:%i", method, *self.server)
|
||||
raise asyncio.TimeoutError
|
||||
if done:
|
||||
return request.result()
|
||||
except (RPCError, ProtocolError) as e:
|
||||
log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s",
|
||||
*self.server, *e.args)
|
||||
|
@ -79,9 +78,6 @@ class ClientSession(BaseClientSession):
|
|||
log.warning("connection to %s:%i lost", *self.server)
|
||||
self.synchronous_close()
|
||||
raise
|
||||
except asyncio.TimeoutError:
|
||||
log.info("timeout sending %s to %s:%i", method, *self.server)
|
||||
raise
|
||||
except asyncio.CancelledError:
|
||||
log.info("cancelled sending %s to %s:%i", method, *self.server)
|
||||
self.synchronous_close()
|
||||
|
@ -152,6 +148,7 @@ class BaseNetwork:
|
|||
self._switch_task: Optional[asyncio.Task] = None
|
||||
self.running = False
|
||||
self.remote_height: int = 0
|
||||
self._concurrency = asyncio.Semaphore(16)
|
||||
|
||||
self._on_connected_controller = StreamController()
|
||||
self.on_connected = self._on_connected_controller.stream
|
||||
|
@ -212,17 +209,18 @@ class BaseNetwork:
|
|||
raise ConnectionError("Attempting to send rpc request when connection is not available.")
|
||||
|
||||
async def retriable_call(self, function, *args, **kwargs):
|
||||
while self.running:
|
||||
if not self.is_connected:
|
||||
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
|
||||
await self.on_connected.first
|
||||
await self.session_pool.wait_for_fastest_session()
|
||||
try:
|
||||
return await function(*args, **kwargs)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("Wallet server call timed out, retrying.")
|
||||
except ConnectionError:
|
||||
pass
|
||||
async with self._concurrency:
|
||||
while self.running:
|
||||
if not self.is_connected:
|
||||
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
|
||||
await self.on_connected.first
|
||||
await self.session_pool.wait_for_fastest_session()
|
||||
try:
|
||||
return await function(*args, **kwargs)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("Wallet server call timed out, retrying.")
|
||||
except ConnectionError:
|
||||
pass
|
||||
raise asyncio.CancelledError() # if we got here, we are shutting down
|
||||
|
||||
def _update_remote_height(self, header_args):
|
||||
|
|
|
@ -111,6 +111,7 @@ class SessionBase(asyncio.Protocol):
|
|||
self.recv_count = 0
|
||||
self.recv_size = 0
|
||||
self.last_recv = self.start_time
|
||||
self.last_packet_received = self.start_time
|
||||
# Bandwidth usage per hour before throttling starts
|
||||
self.bw_limit = 2000000
|
||||
self.bw_time = self.start_time
|
||||
|
@ -174,6 +175,7 @@ class SessionBase(asyncio.Protocol):
|
|||
# asyncio framework
|
||||
def data_received(self, framed_message):
|
||||
"""Called by asyncio when a message comes in."""
|
||||
self.last_packet_received = time.perf_counter()
|
||||
if self.verbosity >= 4:
|
||||
self.logger.debug(f'Received framed message {framed_message}')
|
||||
self.recv_size += len(framed_message)
|
||||
|
|
Loading…
Reference in a new issue