From 011b7f090fb5defb8ef2ec4935c62cf0486020eb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 12 Aug 2019 14:00:53 -0300 Subject: [PATCH] add urgent reconnect when api is called to bypass retry delay --- torba/torba/client/basenetwork.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index f145907a5..752c2ed90 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -2,7 +2,7 @@ import logging import asyncio from operator import itemgetter from typing import Dict, Optional -from time import perf_counter as time +from time import time, perf_counter from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -25,6 +25,7 @@ class ClientSession(BaseClientSession): self.max_seconds_idle = timeout * 2 self.latency = 1 << 32 self._on_connect_cb = on_connect_callback or (lambda: None) + self.trigger_urgent_reconnect = asyncio.Event() @property def available(self): @@ -32,11 +33,11 @@ class ClientSession(BaseClientSession): async def send_request(self, method, args=()): try: - start = time() + start = perf_counter() result = await asyncio.wait_for( super().send_request(method, args), timeout=self.timeout ) - self.latency = time() - start + self.latency = perf_counter() - start return result except RPCError as e: log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) @@ -62,7 +63,12 @@ class ClientSession(BaseClientSession): await self.close() retry_delay = min(60, retry_delay * 2) log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server) - await asyncio.sleep(retry_delay) + try: + await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay) + except asyncio.TimeoutError: + pass + finally: + self.trigger_urgent_reconnect.clear() def ensure_server_version(self, required='1.2'): return self.send_request('server.version', [__version__, required]) @@ -145,6 +151,7 @@ class BaseNetwork: if self.is_connected: return self.client.send_request(list_or_method, args) else: + self.session_pool.trigger_nodelay_connect() raise ConnectionError("Attempting to send rpc request when connection is not available.") def _update_remote_height(self, header_args): @@ -219,8 +226,15 @@ class SessionPool: task.add_done_callback(lambda _: self.ensure_connections()) self.sessions[session] = task + def trigger_nodelay_connect(self): + # used when other parts of the system sees we might have internet back + # bypasses the retry interval + for session in self.sessions: + session.trigger_urgent_reconnect.set() + async def wait_for_fastest_session(self): while not self.fastest_session: + self.trigger_nodelay_connect() self.new_connection_event.clear() await self.new_connection_event.wait() return self.fastest_session