add urgent reconnect when api is called to bypass retry delay

This commit is contained in:
Victor Shyba 2019-08-12 14:00:53 -03:00
parent 4ead92cfbe
commit 011b7f090f

View file

@ -2,7 +2,7 @@ import logging
import asyncio import asyncio
from operator import itemgetter from operator import itemgetter
from typing import Dict, Optional from typing import Dict, Optional
from time import perf_counter as time from time import time, perf_counter
from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError
@ -25,6 +25,7 @@ class ClientSession(BaseClientSession):
self.max_seconds_idle = timeout * 2 self.max_seconds_idle = timeout * 2
self.latency = 1 << 32 self.latency = 1 << 32
self._on_connect_cb = on_connect_callback or (lambda: None) self._on_connect_cb = on_connect_callback or (lambda: None)
self.trigger_urgent_reconnect = asyncio.Event()
@property @property
def available(self): def available(self):
@ -32,11 +33,11 @@ class ClientSession(BaseClientSession):
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
try: try:
start = time() start = perf_counter()
result = await asyncio.wait_for( result = await asyncio.wait_for(
super().send_request(method, args), timeout=self.timeout super().send_request(method, args), timeout=self.timeout
) )
self.latency = time() - start self.latency = perf_counter() - start
return result return result
except RPCError as e: except RPCError as e:
log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args)
@ -62,7 +63,12 @@ class ClientSession(BaseClientSession):
await self.close() await self.close()
retry_delay = min(60, retry_delay * 2) retry_delay = min(60, retry_delay * 2)
log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server) log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server)
await asyncio.sleep(retry_delay) try:
await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay)
except asyncio.TimeoutError:
pass
finally:
self.trigger_urgent_reconnect.clear()
def ensure_server_version(self, required='1.2'): def ensure_server_version(self, required='1.2'):
return self.send_request('server.version', [__version__, required]) return self.send_request('server.version', [__version__, required])
@ -145,6 +151,7 @@ class BaseNetwork:
if self.is_connected: if self.is_connected:
return self.client.send_request(list_or_method, args) return self.client.send_request(list_or_method, args)
else: else:
self.session_pool.trigger_nodelay_connect()
raise ConnectionError("Attempting to send rpc request when connection is not available.") raise ConnectionError("Attempting to send rpc request when connection is not available.")
def _update_remote_height(self, header_args): def _update_remote_height(self, header_args):
@ -219,8 +226,15 @@ class SessionPool:
task.add_done_callback(lambda _: self.ensure_connections()) task.add_done_callback(lambda _: self.ensure_connections())
self.sessions[session] = task self.sessions[session] = task
def trigger_nodelay_connect(self):
# used when other parts of the system sees we might have internet back
# bypasses the retry interval
for session in self.sessions:
session.trigger_urgent_reconnect.set()
async def wait_for_fastest_session(self): async def wait_for_fastest_session(self):
while not self.fastest_session: while not self.fastest_session:
self.trigger_nodelay_connect()
self.new_connection_event.clear() self.new_connection_event.clear()
await self.new_connection_event.wait() await self.new_connection_event.wait()
return self.fastest_session return self.fastest_session