fix misc reconnect scenarios

This commit is contained in:
Victor Shyba 2019-08-23 01:50:25 -03:00
parent 6c3147fcf4
commit 10bd3c9dda
2 changed files with 38 additions and 32 deletions

View file

@ -53,18 +53,17 @@ class ClientSession(BaseClientSession):
self._response_samples += 1 self._response_samples += 1
return result return result
async def send_request(self, method, args=(), timeout=None): async def send_request(self, method, args=()):
log.info("send %s to %s:%i", method, *self.server) log.debug("send %s to %s:%i", method, *self.server)
timeout = timeout or self.timeout
self.pending_amount += 1 self.pending_amount += 1
try: try:
if method == 'server.version': if method == 'server.version':
reply = await self.send_timed_server_version_request(args, timeout) reply = await self.send_timed_server_version_request(args, self.timeout)
else: else:
reply = await asyncio.wait_for( reply = await asyncio.wait_for(
super().send_request(method, args), timeout=timeout super().send_request(method, args), timeout=self.timeout
) )
log.info("got reply for %s from %s:%i", method, *self.server) log.debug("got reply for %s from %s:%i", method, *self.server)
return reply return reply
except RPCError as e: except RPCError as e:
log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s",
@ -87,7 +86,7 @@ class ClientSession(BaseClientSession):
async def ensure_session(self): async def ensure_session(self):
# Handles reconnecting and maintaining a session alive # Handles reconnecting and maintaining a session alive
# TODO: change to 'ping' on newer protocol (above 1.2) # TODO: change to 'ping' on newer protocol (above 1.2)
retry_delay = default_delay = 0.1 retry_delay = default_delay = 1.0
while True: while True:
try: try:
if self.is_closing(): if self.is_closing():
@ -109,7 +108,9 @@ class ClientSession(BaseClientSession):
self.trigger_urgent_reconnect.clear() self.trigger_urgent_reconnect.clear()
async def ensure_server_version(self, required='1.2', timeout=3): async def ensure_server_version(self, required='1.2', timeout=3):
return await self.send_request('server.version', [__version__, required], timeout) return await asyncio.wait_for(
self.send_request('server.version', [__version__, required]), timeout=timeout
)
async def create_connection(self, timeout=6): async def create_connection(self, timeout=6):
connector = Connector(lambda: self, *self.server) connector = Connector(lambda: self, *self.server)
@ -165,23 +166,20 @@ class BaseNetwork:
session.synchronous_close() session.synchronous_close()
log.warning("not connected to any wallet servers") log.warning("not connected to any wallet servers")
return return
if not self.client or client.server_address_and_port != self.client.server_address_and_port:
current_client = self.client current_client = self.client
self.client = client self.client = client
log.info("Switching to SPV wallet server: %s:%d", *self.client.server) log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
self._on_connected_controller.add(True)
try: try:
self._update_remote_height((await self.subscribe_headers(),)) self._update_remote_height((await self.subscribe_headers(),))
log.info("Subscribed to headers: %s:%d", *self.client.server) log.info("Subscribed to headers: %s:%d", *self.client.server)
if current_client:
await current_client.close()
log.info("Closed connection to %s:%i", *current_client.server)
except asyncio.TimeoutError: except asyncio.TimeoutError:
if self.client: if self.client:
await self.client.close() await self.client.close()
self.client = current_client self.client = current_client
return return
self._on_connected_controller.add(True) self.session_pool.new_connection_event.clear()
await asyncio.sleep(30) return await self.session_pool.new_connection_event.wait()
async def start(self): async def start(self):
self.running = True self.running = True
@ -208,14 +206,17 @@ class BaseNetwork:
async def retriable_call(self, function, *args, **kwargs): async def retriable_call(self, function, *args, **kwargs):
while self.running: while self.running:
if not self.is_connected:
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
await self.on_connected.first
await self.session_pool.wait_for_fastest_session()
try: try:
return await function(*args, **kwargs) return await function(*args, **kwargs)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("Wallet server call timed out, retrying.") log.warning("Wallet server call timed out, retrying.")
except ConnectionError: except ConnectionError:
if not self.is_connected and self.running: pass
log.warning("Wallet server unavailable, waiting for it to come back and retry.") raise asyncio.CancelledError() # if we got here, we are shutting down
await self.on_connected.first
def _update_remote_height(self, header_args): def _update_remote_height(self, header_args):
self.remote_height = header_args[0]["height"] self.remote_height = header_args[0]["height"]
@ -242,8 +243,13 @@ class BaseNetwork:
def subscribe_headers(self): def subscribe_headers(self):
return self.rpc('blockchain.headers.subscribe', [True], session=self.client) return self.rpc('blockchain.headers.subscribe', [True], session=self.client)
def subscribe_address(self, address): async def subscribe_address(self, address):
return self.rpc('blockchain.address.subscribe', [address], session=self.client) try:
return await self.rpc('blockchain.address.subscribe', [address], session=self.client)
except asyncio.TimeoutError:
# abort and cancel, we cant lose a subscription, it will happen again on reconnect
self.client.abort()
raise asyncio.CancelledError()
class SessionPool: class SessionPool:

View file

@ -145,8 +145,8 @@ class Stream:
def first(self): def first(self):
future = asyncio.get_event_loop().create_future() future = asyncio.get_event_loop().create_future()
subscription = self.listen( subscription = self.listen(
lambda value: self._cancel_and_callback(subscription, future, value), lambda value: not future.done() and self._cancel_and_callback(subscription, future, value),
lambda exception: self._cancel_and_error(subscription, future, exception) lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception)
) )
return future return future