From 8fd8ebad1bdf8162cc5c89723a3ef3e1fc98f74b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Aug 2019 01:50:25 -0300 Subject: [PATCH] fix misc reconnect scenarios --- torba/torba/client/basenetwork.py | 66 +++++++++++++++++-------------- torba/torba/stream.py | 4 +- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 7a0a5053d..be559527b 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -53,18 +53,17 @@ class ClientSession(BaseClientSession): self._response_samples += 1 return result - async def send_request(self, method, args=(), timeout=None): - log.info("send %s to %s:%i", method, *self.server) - timeout = timeout or self.timeout + async def send_request(self, method, args=()): + log.debug("send %s to %s:%i", method, *self.server) self.pending_amount += 1 try: if method == 'server.version': - reply = await self.send_timed_server_version_request(args, timeout) + reply = await self.send_timed_server_version_request(args, self.timeout) else: reply = await asyncio.wait_for( - super().send_request(method, args), timeout=timeout + super().send_request(method, args), timeout=self.timeout ) - log.info("got reply for %s from %s:%i", method, *self.server) + log.debug("got reply for %s from %s:%i", method, *self.server) return reply except RPCError as e: log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", @@ -87,7 +86,7 @@ class ClientSession(BaseClientSession): async def ensure_session(self): # Handles reconnecting and maintaining a session alive # TODO: change to 'ping' on newer protocol (above 1.2) - retry_delay = default_delay = 0.1 + retry_delay = default_delay = 1.0 while True: try: if self.is_closing(): @@ -109,7 +108,9 @@ class ClientSession(BaseClientSession): self.trigger_urgent_reconnect.clear() async def ensure_server_version(self, required='1.2', timeout=3): - return await self.send_request('server.version', [__version__, required], timeout) + return await asyncio.wait_for( + self.send_request('server.version', [__version__, required]), timeout=timeout + ) async def create_connection(self, timeout=6): connector = Connector(lambda: self, *self.server) @@ -165,23 +166,20 @@ class BaseNetwork: session.synchronous_close() log.warning("not connected to any wallet servers") return - if not self.client or client.server_address_and_port != self.client.server_address_and_port: - current_client = self.client - self.client = client - log.info("Switching to SPV wallet server: %s:%d", *self.client.server) - try: - self._update_remote_height((await self.subscribe_headers(),)) - log.info("Subscribed to headers: %s:%d", *self.client.server) - if current_client: - await current_client.close() - log.info("Closed connection to %s:%i", *current_client.server) - except asyncio.TimeoutError: - if self.client: - await self.client.close() - self.client = current_client - return - self._on_connected_controller.add(True) - await asyncio.sleep(30) + current_client = self.client + self.client = client + log.info("Switching to SPV wallet server: %s:%d", *self.client.server) + self._on_connected_controller.add(True) + try: + self._update_remote_height((await self.subscribe_headers(),)) + log.info("Subscribed to headers: %s:%d", *self.client.server) + except asyncio.TimeoutError: + if self.client: + await self.client.close() + self.client = current_client + return + self.session_pool.new_connection_event.clear() + return await self.session_pool.new_connection_event.wait() async def start(self): self.running = True @@ -208,14 +206,17 @@ class BaseNetwork: async def retriable_call(self, function, *args, **kwargs): while self.running: + if not self.is_connected: + log.warning("Wallet server unavailable, waiting for it to come back and retry.") + await self.on_connected.first + await self.session_pool.wait_for_fastest_session() try: return await function(*args, **kwargs) except asyncio.TimeoutError: log.warning("Wallet server call timed out, retrying.") except ConnectionError: - if not self.is_connected and self.running: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") - await self.on_connected.first + pass + raise asyncio.CancelledError() # if we got here, we are shutting down def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] @@ -242,8 +243,13 @@ class BaseNetwork: def subscribe_headers(self): return self.rpc('blockchain.headers.subscribe', [True], session=self.client) - def subscribe_address(self, address): - return self.rpc('blockchain.address.subscribe', [address], session=self.client) + async def subscribe_address(self, address): + try: + return await self.rpc('blockchain.address.subscribe', [address], session=self.client) + except asyncio.TimeoutError: + # abort and cancel, we cant lose a subscription, it will happen again on reconnect + self.client.abort() + raise asyncio.CancelledError() class SessionPool: diff --git a/torba/torba/stream.py b/torba/torba/stream.py index 412a94525..04b008688 100644 --- a/torba/torba/stream.py +++ b/torba/torba/stream.py @@ -145,8 +145,8 @@ class Stream: def first(self): future = asyncio.get_event_loop().create_future() subscription = self.listen( - lambda value: self._cancel_and_callback(subscription, future, value), - lambda exception: self._cancel_and_error(subscription, future, exception) + lambda value: not future.done() and self._cancel_and_callback(subscription, future, value), + lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception) ) return future