From 6b3c4c70d2abd10662d6db018046fea5411faf72 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 9 Dec 2019 06:01:56 -0300 Subject: [PATCH] dont timeout if data being transferred --- torba/torba/client/basenetwork.py | 52 +++++++++++++++---------------- torba/torba/rpc/session.py | 2 ++ 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 7fc072357..ad18b6e15 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -29,8 +29,6 @@ class ClientSession(BaseClientSession): self.pending_amount = 0 self._on_connect_cb = on_connect_callback or (lambda: None) self.trigger_urgent_reconnect = asyncio.Event() - # one request per second of timeout, conservative default - self._semaphore = asyncio.Semaphore(self.timeout * 2) @property def available(self): @@ -57,20 +55,21 @@ class ClientSession(BaseClientSession): async def send_request(self, method, args=()): self.pending_amount += 1 - async with self._semaphore: - return await self._send_request(method, args) - - async def _send_request(self, method, args=()): log.debug("send %s to %s:%i", method, *self.server) try: if method == 'server.version': - reply = await self.send_timed_server_version_request(args, self.timeout) - else: - reply = await asyncio.wait_for( - super().send_request(method, args), timeout=self.timeout - ) - log.debug("got reply for %s from %s:%i", method, *self.server) - return reply + return await self.send_timed_server_version_request(args, self.timeout) + request = asyncio.ensure_future(super().send_request(method, args)) + while not request.done(): + done, pending = await asyncio.wait([request], timeout=self.timeout) + if pending: + log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received) + if (perf_counter() - self.last_packet_received) < self.timeout: + continue + log.info("timeout sending %s to %s:%i", method, *self.server) + raise asyncio.TimeoutError + if done: + return request.result() except (RPCError, ProtocolError) as e: log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) @@ -79,9 +78,6 @@ class ClientSession(BaseClientSession): log.warning("connection to %s:%i lost", *self.server) self.synchronous_close() raise - except asyncio.TimeoutError: - log.info("timeout sending %s to %s:%i", method, *self.server) - raise except asyncio.CancelledError: log.info("cancelled sending %s to %s:%i", method, *self.server) self.synchronous_close() @@ -152,6 +148,7 @@ class BaseNetwork: self._switch_task: Optional[asyncio.Task] = None self.running = False self.remote_height: int = 0 + self._concurrency = asyncio.Semaphore(16) self._on_connected_controller = StreamController() self.on_connected = self._on_connected_controller.stream @@ -212,17 +209,18 @@ class BaseNetwork: raise ConnectionError("Attempting to send rpc request when connection is not available.") async def retriable_call(self, function, *args, **kwargs): - while self.running: - if not self.is_connected: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") - await self.on_connected.first - await self.session_pool.wait_for_fastest_session() - try: - return await function(*args, **kwargs) - except asyncio.TimeoutError: - log.warning("Wallet server call timed out, retrying.") - except ConnectionError: - pass + async with self._concurrency: + while self.running: + if not self.is_connected: + log.warning("Wallet server unavailable, waiting for it to come back and retry.") + await self.on_connected.first + await self.session_pool.wait_for_fastest_session() + try: + return await function(*args, **kwargs) + except asyncio.TimeoutError: + log.warning("Wallet server call timed out, retrying.") + except ConnectionError: + pass raise asyncio.CancelledError() # if we got here, we are shutting down def _update_remote_height(self, header_args): diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index ea82e56c0..1b93633f0 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -111,6 +111,7 @@ class SessionBase(asyncio.Protocol): self.recv_count = 0 self.recv_size = 0 self.last_recv = self.start_time + self.last_packet_received = self.start_time # Bandwidth usage per hour before throttling starts self.bw_limit = 2000000 self.bw_time = self.start_time @@ -174,6 +175,7 @@ class SessionBase(asyncio.Protocol): # asyncio framework def data_received(self, framed_message): """Called by asyncio when a message comes in.""" + self.last_packet_received = time.perf_counter() if self.verbosity >= 4: self.logger.debug(f'Received framed message {framed_message}') self.recv_size += len(framed_message)