From 97c0b60a227862cd7c8a76a4cdd03ad3479448c0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 21 Aug 2019 15:58:34 -0400 Subject: [PATCH 01/10] logging --- lbry/lbry/blob_exchange/client.py | 2 +- lbry/lbry/dht/blob_announcer.py | 3 +-- lbry/lbry/dht/node.py | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index c4c8bde30..f69decb0c 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -151,7 +151,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return self._blob_bytes_received, self.close() msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ f" timeout in {self.peer_timeout}" - log.info(msg) + log.debug(msg) msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) log.info("%s at %fMB/s", msg, diff --git a/lbry/lbry/dht/blob_announcer.py b/lbry/lbry/dht/blob_announcer.py index 22d12a550..5b9cb0848 100644 --- a/lbry/lbry/dht/blob_announcer.py +++ b/lbry/lbry/dht/blob_announcer.py @@ -22,13 +22,12 @@ class BlobAnnouncer: if peers > 4: return blob_hash else: - log.warning("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) + log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) except Exception as err: if isinstance(err, asyncio.CancelledError): raise err log.warning("error announcing %s: %s", blob_hash[:8], str(err)) - async def _announce(self, batch_size: typing.Optional[int] = 10): while batch_size: if not self.node.joined.is_set(): diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index 625db46b5..57fb9baa1 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -88,10 +88,10 @@ class Node: ) stored_to = [node_id for node_id, contacted in stored_to_tup if contacted] if stored_to: - log.info("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8], + log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8], len(stored_to), len(peers)) else: - log.warning("Failed announcing %s, stored to 0 peers", blob_hash[:8]) + log.debug("Failed announcing %s, stored to 0 peers", blob_hash[:8]) return stored_to def stop(self) -> None: From 8657a35fd71be3f5f91a8632f52cc8a7d2294ca1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 21 Aug 2019 16:16:07 -0400 Subject: [PATCH 02/10] switch_to_fastest --- torba/torba/client/basenetwork.py | 74 ++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 1693bb178..38ff07b5a 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -40,11 +40,12 @@ class ClientSession(BaseClientSession): return None return self.transport.get_extra_info('peername') - async def send_timed_server_version_request(self, args=()): + async def send_timed_server_version_request(self, args=(), timeout=None): + timeout = timeout or self.timeout log.debug("send version request to %s:%i", *self.server) start = perf_counter() result = await asyncio.wait_for( - super().send_request('server.version', args), timeout=self.timeout + super().send_request('server.version', args), timeout=timeout ) current_response_time = perf_counter() - start response_sum = (self.response_time or 0) * self._response_samples + current_response_time @@ -52,18 +53,28 @@ class ClientSession(BaseClientSession): self._response_samples += 1 return result - async def send_request(self, method, args=()): + async def send_request(self, method, args=(), timeout=None): + timeout = timeout or self.timeout self.pending_amount += 1 try: if method == 'server.version': - return await self.send_timed_server_version_request(args) + return await self.send_timed_server_version_request(args, timeout) return await asyncio.wait_for( - super().send_request(method, args), timeout=self.timeout + super().send_request(method, args), timeout=timeout ) except RPCError as e: log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) raise e + except ConnectionError: + log.warning("connection to %s:%i lost", *self.server) + self.synchronous_close() + raise asyncio.CancelledError(f"connection to {self.server[0]}:{self.server[1]} lost") + except asyncio.TimeoutError: + raise + except asyncio.CancelledError: + self.synchronous_close() + raise finally: self.pending_amount -= 1 @@ -83,19 +94,16 @@ class ClientSession(BaseClientSession): except (asyncio.TimeoutError, OSError): await self.close() retry_delay = min(60, retry_delay * 2) - log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server) + log.debug("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server) try: await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay) except asyncio.TimeoutError: pass - except asyncio.CancelledError: - self.synchronous_close() - raise finally: self.trigger_urgent_reconnect.clear() - def ensure_server_version(self, required='1.2'): - return self.send_request('server.version', [__version__, required]) + async def ensure_server_version(self, required='1.2', timeout=3): + return await self.send_request('server.version', [__version__, required], timeout) async def create_connection(self, timeout=6): connector = Connector(lambda: self, *self.server) @@ -120,7 +128,6 @@ class ClientSession(BaseClientSession): class BaseNetwork: def __init__(self, ledger): - self.switch_event = asyncio.Event() self.config = ledger.config self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6)) self.client: Optional[ClientSession] = None @@ -141,24 +148,41 @@ class BaseNetwork: 'blockchain.address.subscribe': self._on_status_controller, } + async def switch_to_fastest(self): + try: + client = await asyncio.wait_for(self.session_pool.wait_for_fastest_session(), 30) + except asyncio.TimeoutError: + if self.client: + await self.client.close() + self.client = None + for session in self.session_pool.sessions: + session.synchronous_close() + log.warning("not connected to any wallet servers") + return + if not self.client or client.server_address_and_port != self.client.server_address_and_port: + current_client = self.client + self.client = client + log.info("Switching to SPV wallet server: %s:%d", *self.client.server) + try: + self._update_remote_height((await self.subscribe_headers(),)) + log.info("Subscribed to headers: %s:%d", *self.client.server) + if current_client: + await current_client.close() + log.info("Closed connection to %s:%i", *current_client.server) + except asyncio.TimeoutError: + if self.client: + await self.client.close() + self.client = current_client + return + self._on_connected_controller.add(True) + await asyncio.sleep(30) + async def start(self): self.running = True self.session_pool.start(self.config['default_servers']) self.on_header.listen(self._update_remote_height) while self.running: - try: - self.client = await self.session_pool.wait_for_fastest_session() - self._update_remote_height((await self.subscribe_headers(),)) - log.info("Switching to SPV wallet server: %s:%d", *self.client.server) - self._on_connected_controller.add(True) - self.client.on_disconnected.listen(lambda _: self.switch_event.set()) - await self.switch_event.wait() - self.switch_event.clear() - except asyncio.CancelledError: - await self.stop() - raise - except asyncio.TimeoutError: - pass + await self.switch_to_fastest() async def stop(self): self.running = False From 5e8d169c50b7568b2cba7e88a4546b91fda1de8d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 22 Aug 2019 14:35:22 -0400 Subject: [PATCH 03/10] debug --- torba/torba/client/basenetwork.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 38ff07b5a..4ff0d63e0 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -54,14 +54,18 @@ class ClientSession(BaseClientSession): return result async def send_request(self, method, args=(), timeout=None): + log.info("send %s to %s:%i", method, *self.server) timeout = timeout or self.timeout self.pending_amount += 1 try: if method == 'server.version': - return await self.send_timed_server_version_request(args, timeout) - return await asyncio.wait_for( - super().send_request(method, args), timeout=timeout - ) + reply = await self.send_timed_server_version_request(args, timeout) + else: + reply = await asyncio.wait_for( + super().send_request(method, args), timeout=timeout + ) + log.info("got reply for %s from %s:%i", method, *self.server) + return reply except RPCError as e: log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) @@ -71,8 +75,10 @@ class ClientSession(BaseClientSession): self.synchronous_close() raise asyncio.CancelledError(f"connection to {self.server[0]}:{self.server[1]} lost") except asyncio.TimeoutError: + log.info("timeout sending %s to %s:%i", method, *self.server) raise except asyncio.CancelledError: + log.info("cancelled sending %s to %s:%i", method, *self.server) self.synchronous_close() raise finally: From 6c3147fcf45d159558a303a40b12cf7b813f9d78 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 22 Aug 2019 16:32:14 -0300 Subject: [PATCH 04/10] simple address subs instead of batch --- torba/torba/client/baseledger.py | 10 +++++++--- torba/torba/client/basenetwork.py | 8 ++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 88959fb38..d915b1308 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -395,9 +395,13 @@ class BaseLedger(metaclass=LedgerRegistry): async def subscribe_addresses(self, address_manager: baseaccount.AddressManager, addresses: List[str]): if self.network.is_connected and addresses: - async for address, remote_status in self.network.subscribe_address(*addresses): - # subscribe isnt a retriable call as it happens right after a connection is made - self._update_tasks.add(self.update_history(address, remote_status, address_manager)) + await asyncio.wait([ + self.subscribe_address(address_manager, address) for address in addresses + ]) + + async def subscribe_address(self, address_manager: baseaccount.AddressManager, address: str): + remote_status = await self.network.subscribe_address(address) + self._update_tasks.add(self.update_history(address, remote_status, address_manager)) def process_status_update(self, update): address, remote_status = update diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 4ff0d63e0..7a0a5053d 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -242,12 +242,8 @@ class BaseNetwork: def subscribe_headers(self): return self.rpc('blockchain.headers.subscribe', [True], session=self.client) - async def subscribe_address(self, *addresses): - async with self.client.send_batch() as batch: - for address in addresses: - batch.add_request('blockchain.address.subscribe', [address]) - for address, status in zip(addresses, batch.results): - yield address, status + def subscribe_address(self, address): + return self.rpc('blockchain.address.subscribe', [address], session=self.client) class SessionPool: From 10bd3c9ddac65649e2e38f47e6a2816a2b8a296b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Aug 2019 01:50:25 -0300 Subject: [PATCH 05/10] fix misc reconnect scenarios --- torba/torba/client/basenetwork.py | 66 +++++++++++++++++-------------- torba/torba/stream.py | 4 +- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 7a0a5053d..be559527b 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -53,18 +53,17 @@ class ClientSession(BaseClientSession): self._response_samples += 1 return result - async def send_request(self, method, args=(), timeout=None): - log.info("send %s to %s:%i", method, *self.server) - timeout = timeout or self.timeout + async def send_request(self, method, args=()): + log.debug("send %s to %s:%i", method, *self.server) self.pending_amount += 1 try: if method == 'server.version': - reply = await self.send_timed_server_version_request(args, timeout) + reply = await self.send_timed_server_version_request(args, self.timeout) else: reply = await asyncio.wait_for( - super().send_request(method, args), timeout=timeout + super().send_request(method, args), timeout=self.timeout ) - log.info("got reply for %s from %s:%i", method, *self.server) + log.debug("got reply for %s from %s:%i", method, *self.server) return reply except RPCError as e: log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", @@ -87,7 +86,7 @@ class ClientSession(BaseClientSession): async def ensure_session(self): # Handles reconnecting and maintaining a session alive # TODO: change to 'ping' on newer protocol (above 1.2) - retry_delay = default_delay = 0.1 + retry_delay = default_delay = 1.0 while True: try: if self.is_closing(): @@ -109,7 +108,9 @@ class ClientSession(BaseClientSession): self.trigger_urgent_reconnect.clear() async def ensure_server_version(self, required='1.2', timeout=3): - return await self.send_request('server.version', [__version__, required], timeout) + return await asyncio.wait_for( + self.send_request('server.version', [__version__, required]), timeout=timeout + ) async def create_connection(self, timeout=6): connector = Connector(lambda: self, *self.server) @@ -165,23 +166,20 @@ class BaseNetwork: session.synchronous_close() log.warning("not connected to any wallet servers") return - if not self.client or client.server_address_and_port != self.client.server_address_and_port: - current_client = self.client - self.client = client - log.info("Switching to SPV wallet server: %s:%d", *self.client.server) - try: - self._update_remote_height((await self.subscribe_headers(),)) - log.info("Subscribed to headers: %s:%d", *self.client.server) - if current_client: - await current_client.close() - log.info("Closed connection to %s:%i", *current_client.server) - except asyncio.TimeoutError: - if self.client: - await self.client.close() - self.client = current_client - return - self._on_connected_controller.add(True) - await asyncio.sleep(30) + current_client = self.client + self.client = client + log.info("Switching to SPV wallet server: %s:%d", *self.client.server) + self._on_connected_controller.add(True) + try: + self._update_remote_height((await self.subscribe_headers(),)) + log.info("Subscribed to headers: %s:%d", *self.client.server) + except asyncio.TimeoutError: + if self.client: + await self.client.close() + self.client = current_client + return + self.session_pool.new_connection_event.clear() + return await self.session_pool.new_connection_event.wait() async def start(self): self.running = True @@ -208,14 +206,17 @@ class BaseNetwork: async def retriable_call(self, function, *args, **kwargs): while self.running: + if not self.is_connected: + log.warning("Wallet server unavailable, waiting for it to come back and retry.") + await self.on_connected.first + await self.session_pool.wait_for_fastest_session() try: return await function(*args, **kwargs) except asyncio.TimeoutError: log.warning("Wallet server call timed out, retrying.") except ConnectionError: - if not self.is_connected and self.running: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") - await self.on_connected.first + pass + raise asyncio.CancelledError() # if we got here, we are shutting down def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] @@ -242,8 +243,13 @@ class BaseNetwork: def subscribe_headers(self): return self.rpc('blockchain.headers.subscribe', [True], session=self.client) - def subscribe_address(self, address): - return self.rpc('blockchain.address.subscribe', [address], session=self.client) + async def subscribe_address(self, address): + try: + return await self.rpc('blockchain.address.subscribe', [address], session=self.client) + except asyncio.TimeoutError: + # abort and cancel, we cant lose a subscription, it will happen again on reconnect + self.client.abort() + raise asyncio.CancelledError() class SessionPool: diff --git a/torba/torba/stream.py b/torba/torba/stream.py index 412a94525..04b008688 100644 --- a/torba/torba/stream.py +++ b/torba/torba/stream.py @@ -145,8 +145,8 @@ class Stream: def first(self): future = asyncio.get_event_loop().create_future() subscription = self.listen( - lambda value: self._cancel_and_callback(subscription, future, value), - lambda exception: self._cancel_and_error(subscription, future, exception) + lambda value: not future.done() and self._cancel_and_callback(subscription, future, value), + lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception) ) return future From 22b2cf8b0ce979cee342c979014b295787fbfc7e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Aug 2019 02:07:42 -0300 Subject: [PATCH 06/10] warn when out of sync --- torba/torba/client/baseledger.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index d915b1308..72db7b802 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -469,6 +469,15 @@ class BaseLedger(metaclass=LedgerRegistry): if address_manager is not None: await address_manager.ensure_address_gap() + local_status, local_history = await self.get_local_status_and_history(address) + + if local_status != remote_status: + log.warning( + "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", + remote_status, len(remote_history), local_status, len(local_history) + ) + else: + log.info("Sync completed for: %s", address) async def cache_transaction(self, txid, remote_height): cache_item = self._tx_cache.get(txid) From ee9ea1faab30c9a942d68485bdf87d12f80bb711 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Aug 2019 02:07:55 -0300 Subject: [PATCH 07/10] get history only from main server --- torba/torba/client/basenetwork.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index be559527b..da1cf5c70 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -221,9 +221,6 @@ class BaseNetwork: def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] - def get_history(self, address): - return self.rpc('blockchain.address.get_history', [address]) - def get_transaction(self, tx_hash): return self.rpc('blockchain.transaction.get', [tx_hash]) @@ -236,7 +233,10 @@ class BaseNetwork: def get_headers(self, height, count=10000): return self.rpc('blockchain.block.headers', [height, count]) - # --- Subscribes and broadcasts are always aimed towards the master client directly + # --- Subscribes, history and broadcasts are always aimed towards the master client directly + def get_history(self, address): + return self.rpc('blockchain.address.get_history', [address], session=self.client) + def broadcast(self, raw_transaction): return self.rpc('blockchain.transaction.broadcast', [raw_transaction], session=self.client) From 089cefb77b000fd92957c7061bce82325b08b50f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Aug 2019 13:31:00 -0300 Subject: [PATCH 08/10] make it debug for now, there are false positives --- torba/torba/client/baseledger.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 72db7b802..d127243d9 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -469,15 +469,17 @@ class BaseLedger(metaclass=LedgerRegistry): if address_manager is not None: await address_manager.ensure_address_gap() - local_status, local_history = await self.get_local_status_and_history(address) + local_status, local_history = await self.get_local_status_and_history(address) if local_status != remote_status: - log.warning( + log.debug( "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", remote_status, len(remote_history), local_status, len(local_history) ) + log.debug("local: %s", local_history) + log.debug("remote: %s", remote_history) else: - log.info("Sync completed for: %s", address) + log.debug("Sync completed for: %s", address) async def cache_transaction(self, txid, remote_height): cache_item = self._tx_cache.get(txid) From b9b8178e30f8bf0683dfb4ad1f14ab7e765d9708 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Aug 2019 13:31:17 -0300 Subject: [PATCH 09/10] use a semaphore to avoid false timeouts on large syncs --- torba/torba/client/basenetwork.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index da1cf5c70..57d8f901e 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -29,6 +29,8 @@ class ClientSession(BaseClientSession): self.pending_amount = 0 self._on_connect_cb = on_connect_callback or (lambda: None) self.trigger_urgent_reconnect = asyncio.Event() + # one request per second of timeout, conservative default + self._semaphore = asyncio.Semaphore(self.timeout) @property def available(self): @@ -54,8 +56,12 @@ class ClientSession(BaseClientSession): return result async def send_request(self, method, args=()): - log.debug("send %s to %s:%i", method, *self.server) self.pending_amount += 1 + async with self._semaphore: + return await self._send_request(method, args) + + async def _send_request(self, method, args=()): + log.debug("send %s to %s:%i", method, *self.server) try: if method == 'server.version': reply = await self.send_timed_server_version_request(args, self.timeout) From 9cfa9b2c573c1c1bb897fe7fbbf5a7fdd0f888c8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 26 Aug 2019 03:51:30 -0300 Subject: [PATCH 10/10] use only the main session, dont cancel on connectionerror --- torba/torba/client/basenetwork.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 57d8f901e..1d6883fd6 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -78,7 +78,7 @@ class ClientSession(BaseClientSession): except ConnectionError: log.warning("connection to %s:%i lost", *self.server) self.synchronous_close() - raise asyncio.CancelledError(f"connection to {self.server[0]}:{self.server[1]} lost") + raise except asyncio.TimeoutError: log.info("timeout sending %s to %s:%i", method, *self.server) raise @@ -203,8 +203,10 @@ class BaseNetwork: return self.client and not self.client.is_closing() def rpc(self, list_or_method, args, session=None): - session = session or self.session_pool.fastest_session - if session: + # fixme: use fastest unloaded session, but for now it causes issues with wallet sync + # session = session or self.session_pool.fastest_session + session = self.client + if session and not session.is_closing(): return session.send_request(list_or_method, args) else: self.session_pool.trigger_nodelay_connect()