From f567aca532a0eaa722f3692892f8d08d66ef4627 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 18 Aug 2019 15:40:38 -0300 Subject: [PATCH 01/16] retry and batch requests, fix some loose ends --- torba/tests/client_tests/unit/test_ledger.py | 6 ++- torba/torba/client/baseledger.py | 19 ++++----- torba/torba/client/basenetwork.py | 45 ++++++++++++-------- torba/torba/rpc/jsonrpc.py | 6 +-- torba/torba/rpc/session.py | 7 +-- 5 files changed, 47 insertions(+), 36 deletions(-) diff --git a/torba/tests/client_tests/unit/test_ledger.py b/torba/tests/client_tests/unit/test_ledger.py index 0e077b441..beb26a5d1 100644 --- a/torba/tests/client_tests/unit/test_ledger.py +++ b/torba/tests/client_tests/unit/test_ledger.py @@ -18,6 +18,9 @@ class MockNetwork: self.get_transaction_called = [] self.is_connected = False + def retriable_call(self, function, *args, **kwargs): + return function(*args, **kwargs) + async def get_history(self, address): self.get_history_called.append(address) self.address = address @@ -121,8 +124,9 @@ class TestSynchronization(LedgerTestCase): ) -class MocHeaderNetwork: +class MocHeaderNetwork(MockNetwork): def __init__(self, responses): + super().__init__(None, None) self.responses = responses async def get_headers(self, height, blocks): diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 8f16455d9..52dba6c54 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -310,7 +310,7 @@ class BaseLedger(metaclass=LedgerRegistry): subscription_update = False if not headers: - header_response = await self.network.get_headers(height, 2001) + header_response = await self.network.retriable_call(self.network.get_headers, height, 2001) headers = header_response['hex'] if not headers: @@ -395,13 +395,9 @@ class BaseLedger(metaclass=LedgerRegistry): async def subscribe_addresses(self, address_manager: baseaccount.AddressManager, addresses: List[str]): if self.network.is_connected and addresses: - await asyncio.wait([ - self.subscribe_address(address_manager, address) for address in addresses - ]) - - async def subscribe_address(self, address_manager: baseaccount.AddressManager, address: str): - remote_status = await self.network.subscribe_address(address) - self._update_tasks.add(self.update_history(address, remote_status, address_manager)) + async for address, remote_status in self.network.subscribe_address(*addresses): + # subscribe isnt a retriable call as it happens right after a connection is made + self._update_tasks.add(self.update_history(address, remote_status, address_manager)) def process_status_update(self, update): address, remote_status = update @@ -417,7 +413,7 @@ class BaseLedger(metaclass=LedgerRegistry): if local_status == remote_status: return - remote_history = await self.network.get_history(address) + remote_history = await self.network.retriable_call(self.network.get_history, address) cache_tasks = [] synced_history = StringIO() @@ -489,7 +485,7 @@ class BaseLedger(metaclass=LedgerRegistry): if tx is None: # fetch from network - _raw = await self.network.get_transaction(txid) + _raw = await self.network.retriable_call(self.network.get_transaction, txid) if _raw: tx = self.transaction_class(unhexlify(_raw)) await self.maybe_verify_transaction(tx, remote_height) @@ -510,7 +506,7 @@ class BaseLedger(metaclass=LedgerRegistry): async def maybe_verify_transaction(self, tx, remote_height): tx.height = remote_height if 0 < remote_height <= len(self.headers): - merkle = await self.network.get_merkle(tx.id, remote_height) + merkle = await self.network.retriable_call(self.network.get_merkle, tx.id, remote_height) merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash) header = self.headers[remote_height] tx.position = merkle['pos'] @@ -524,6 +520,7 @@ class BaseLedger(metaclass=LedgerRegistry): return None def broadcast(self, tx): + # broadcast cant be a retriable call yet return self.network.broadcast(hexlify(tx.raw).decode()) async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=None): diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 39ed1fbdf..5ae9f1a47 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -135,31 +135,33 @@ class BaseNetwork: self.running = False if self.session_pool: self.session_pool.stop() - if self.is_connected: - disconnected = self.client.on_disconnected.first - await self.client.close() - await disconnected @property def is_connected(self): return self.client and not self.client.is_closing() - def rpc(self, list_or_method, args): - fastest = self.session_pool.fastest_session - if fastest is not None and self.client != fastest: - self.switch_event.set() - if self.is_connected: - return self.client.send_request(list_or_method, args) + def rpc(self, list_or_method, args, session=None): + session = session or self.session_pool.fastest_session + if session: + return session.send_request(list_or_method, args) else: self.session_pool.trigger_nodelay_connect() raise ConnectionError("Attempting to send rpc request when connection is not available.") + async def retriable_call(self, function, *args, **kwargs): + while True: + try: + return await function(*args, **kwargs) + except asyncio.TimeoutError: + log.warning("Wallet server call timed out, retrying.") + except ConnectionError: + if not self.is_connected: + log.warning("Wallet server unavailable, waiting for it to come back and retry.") + await self.on_connected.first + def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] - def broadcast(self, raw_transaction): - return self.rpc('blockchain.transaction.broadcast', [raw_transaction]) - def get_history(self, address): return self.rpc('blockchain.address.get_history', [address]) @@ -175,11 +177,19 @@ class BaseNetwork: def get_headers(self, height, count=10000): return self.rpc('blockchain.block.headers', [height, count]) - def subscribe_headers(self): - return self.rpc('blockchain.headers.subscribe', [True]) + # --- Subscribes and broadcasts are always aimed towards the master client directly + def broadcast(self, raw_transaction): + return self.rpc('blockchain.transaction.broadcast', [raw_transaction], session=self.client) - def subscribe_address(self, address): - return self.rpc('blockchain.address.subscribe', [address]) + def subscribe_headers(self): + return self.rpc('blockchain.headers.subscribe', [True], session=self.client) + + async def subscribe_address(self, *addresses): + async with self.client.send_batch() as batch: + for address in addresses: + batch.add_request('blockchain.address.subscribe', [address]) + for address, status in zip(addresses, batch.results): + yield address, status class SessionPool: @@ -218,6 +228,7 @@ class SessionPool: def stop(self): for session, task in self.sessions.items(): task.cancel() + session.connection_lost(asyncio.CancelledError()) session.abort() self.sessions.clear() diff --git a/torba/torba/rpc/jsonrpc.py b/torba/torba/rpc/jsonrpc.py index 4e5cca8ca..2e8bfa2a7 100644 --- a/torba/torba/rpc/jsonrpc.py +++ b/torba/torba/rpc/jsonrpc.py @@ -746,10 +746,8 @@ class JSONRPCConnection(object): self._protocol = item return self.receive_message(message) - def time_out_pending_requests(self): - """Times out all pending requests.""" - # this used to be CancelledError, but thats confusing as in are we closing the whole sdk or failing? - exception = asyncio.TimeoutError() + def raise_pending_requests(self, exception): + exception = exception or asyncio.TimeoutError() for request, event in self._requests.values(): event.result = exception event.set() diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index e16b6bbb4..9ff3e7ed0 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -146,7 +146,7 @@ class SessionBase(asyncio.Protocol): await asyncio.wait_for(self._can_send.wait(), secs) except asyncio.TimeoutError: self.abort() - raise asyncio.CancelledError(f'task timed out after {secs}s') + raise asyncio.TimeoutError(f'task timed out after {secs}s') async def _send_message(self, message): if not self._can_send.is_set(): @@ -215,7 +215,8 @@ class SessionBase(asyncio.Protocol): self._address = None self.transport = None self._task_group.cancel() - self._pm_task.cancel() + if self._pm_task: + self._pm_task.cancel() # Release waiting tasks self._can_send.set() @@ -456,7 +457,7 @@ class RPCSession(SessionBase): def connection_lost(self, exc): # Cancel pending requests and message processing - self.connection.time_out_pending_requests() + self.connection.raise_pending_requests(exc) super().connection_lost(exc) # External API From 20af1396b9dbc31eab012f081f2bfd94b4e6d1cd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 Aug 2019 11:58:18 -0300 Subject: [PATCH 02/16] refactor stop --- torba/torba/client/basenetwork.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 5ae9f1a47..5a4650a76 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -67,6 +67,9 @@ class ClientSession(BaseClientSession): await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay) except asyncio.TimeoutError: pass + except asyncio.CancelledError as exception: + self.connection_lost(exception) + raise exception finally: self.trigger_urgent_reconnect.clear() @@ -133,8 +136,7 @@ class BaseNetwork: async def stop(self): self.running = False - if self.session_pool: - self.session_pool.stop() + self.session_pool.stop() @property def is_connected(self): @@ -149,13 +151,13 @@ class BaseNetwork: raise ConnectionError("Attempting to send rpc request when connection is not available.") async def retriable_call(self, function, *args, **kwargs): - while True: + while self.running: try: return await function(*args, **kwargs) except asyncio.TimeoutError: log.warning("Wallet server call timed out, retrying.") except ConnectionError: - if not self.is_connected: + if not self.is_connected and self.running: log.warning("Wallet server unavailable, waiting for it to come back and retry.") await self.on_connected.first @@ -226,10 +228,8 @@ class SessionPool: self.ensure_connections() def stop(self): - for session, task in self.sessions.items(): + for task in self.sessions.values(): task.cancel() - session.connection_lost(asyncio.CancelledError()) - session.abort() self.sessions.clear() def ensure_connections(self): From c826c7da0d9bb863816174f7bc419a3219b3fa97 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 Aug 2019 13:30:45 -0300 Subject: [PATCH 03/16] take load into consideration when picking fastest --- torba/torba/client/basenetwork.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 5a4650a76..ad2f8e5eb 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -20,7 +20,8 @@ class ClientSession(BaseClientSession): super().__init__(*args, **kwargs) self._on_disconnect_controller = StreamController() self.on_disconnected = self._on_disconnect_controller.stream - self.bw_limit = self.framer.max_size = self.max_errors = 1 << 32 + self.framer.max_size = self.max_errors = 1 << 32 + self.bw_limit = -1 self.timeout = timeout self.max_seconds_idle = timeout * 2 self.response_time: Optional[float] = None @@ -31,6 +32,10 @@ class ClientSession(BaseClientSession): def available(self): return not self.is_closing() and self._can_send.is_set() and self.response_time is not None + @property + def pending_size(self): + return len(self.connection.pending_requests()) + async def send_request(self, method, args=()): try: start = perf_counter() @@ -215,7 +220,8 @@ class SessionPool: if not self.available_sessions: return None return min( - [(session.response_time, session) for session in self.available_sessions], key=itemgetter(0) + [(session.response_time * session.pending_size, session) for session in self.available_sessions], + key=itemgetter(0) )[1] def start(self, default_servers): From af797296ed8c40f5b40abb8a3b45d92fae535e08 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 Aug 2019 16:50:24 -0300 Subject: [PATCH 04/16] rolling averages on response time --- torba/torba/client/basenetwork.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index ad2f8e5eb..466573d57 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -25,6 +25,8 @@ class ClientSession(BaseClientSession): self.timeout = timeout self.max_seconds_idle = timeout * 2 self.response_time: Optional[float] = None + self._response_samples = 0 + self.pending_amount = 0 self._on_connect_cb = on_connect_callback or (lambda: None) self.trigger_urgent_reconnect = asyncio.Event() @@ -32,24 +34,30 @@ class ClientSession(BaseClientSession): def available(self): return not self.is_closing() and self._can_send.is_set() and self.response_time is not None - @property - def pending_size(self): - return len(self.connection.pending_requests()) - async def send_request(self, method, args=()): try: + self.pending_amount += 1 start = perf_counter() result = await asyncio.wait_for( super().send_request(method, args), timeout=self.timeout ) - self.response_time = perf_counter() - start + current_response_time = perf_counter() - start + if self.response_time: + self.response_time = (self.response_time * self._response_samples + + current_response_time) / (self._response_samples + 1) + else: + self.response_time = current_response_time + self._response_samples += 1 return result except RPCError as e: log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) raise e except TimeoutError: self.response_time = None + self._response_samples = 0 raise + finally: + self.pending_amount -= 1 async def ensure_session(self): # Handles reconnecting and maintaining a session alive @@ -93,6 +101,7 @@ class ClientSession(BaseClientSession): log.debug("Connection lost: %s:%d", *self.server) super().connection_lost(exc) self.response_time = None + self._response_samples = 0 self._on_disconnect_controller.add(True) @@ -220,7 +229,8 @@ class SessionPool: if not self.available_sessions: return None return min( - [(session.response_time * session.pending_size, session) for session in self.available_sessions], + [(session.response_time * session.pending_amount, session) + for session in self.available_sessions], key=itemgetter(0) )[1] From 7b0e4617d32414cc0c8a04ca304e8e4865c472b3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 Aug 2019 17:22:07 -0300 Subject: [PATCH 05/16] fix db test stability --- torba/tests/client_tests/unit/test_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torba/tests/client_tests/unit/test_database.py b/torba/tests/client_tests/unit/test_database.py index e5217d12c..569831c2a 100644 --- a/torba/tests/client_tests/unit/test_database.py +++ b/torba/tests/client_tests/unit/test_database.py @@ -255,7 +255,7 @@ class TestQueries(AsyncioTestCase): self.ledger.db.db.execute_fetchall = check_parameters_length account = await self.create_account() tx = await self.create_tx_from_nothing(account, 0) - for height in range(1200): + for height in range(1, 1200): tx = await self.create_tx_from_txo(tx.outputs[0], account, height=height) variable_limit = self.ledger.db.MAX_QUERY_VARIABLES for limit in range(variable_limit-2, variable_limit+2): From 07bae26fd3f813d71d06f13a9a150f83e829db55 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 14:16:56 -0400 Subject: [PATCH 06/16] close connection and reraise on CancelledError --- torba/torba/client/basenetwork.py | 6 +++--- torba/torba/rpc/session.py | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 466573d57..58a51eef3 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -80,9 +80,9 @@ class ClientSession(BaseClientSession): await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay) except asyncio.TimeoutError: pass - except asyncio.CancelledError as exception: - self.connection_lost(exception) - raise exception + except asyncio.CancelledError: + self.synchronous_close() + raise finally: self.trigger_urgent_reconnect.clear() diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index 9ff3e7ed0..7b0350b15 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -254,6 +254,7 @@ class SessionBase(asyncio.Protocol): if self.transport: self.transport.abort() + # TODO: replace with synchronous_close async def close(self, *, force_after=30): """Close the connection and return when closed.""" self._close() @@ -263,6 +264,11 @@ class SessionBase(asyncio.Protocol): self.abort() await self._pm_task + def synchronous_close(self): + self._close() + if self._pm_task and not self._pm_task.done(): + self._pm_task.cancel() + class MessageSession(SessionBase): """Session class for protocols where messages are not tied to responses, From 72690b5cff021bc19274031655b48ad142ac435f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 14:19:50 -0400 Subject: [PATCH 07/16] raise asyncio.TimeoutError --- torba/torba/client/baseledger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 52dba6c54..c11c8648b 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -542,4 +542,4 @@ class BaseLedger(metaclass=LedgerRegistry): )) for address_record in records ], timeout=timeout) if pending: - raise TimeoutError('Timed out waiting for transaction.') + raise asyncio.TimeoutError('Timed out waiting for transaction.') From 0e48772c848b1fcf0df1558940f1c31a8058669a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 14:45:53 -0400 Subject: [PATCH 08/16] track connection latency --- torba/torba/client/basenetwork.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 58a51eef3..01f6f906f 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -25,6 +25,7 @@ class ClientSession(BaseClientSession): self.timeout = timeout self.max_seconds_idle = timeout * 2 self.response_time: Optional[float] = None + self.connection_latency: Optional[float] = None self._response_samples = 0 self.pending_amount = 0 self._on_connect_cb = on_connect_callback or (lambda: None) @@ -91,7 +92,9 @@ class ClientSession(BaseClientSession): async def create_connection(self, timeout=6): connector = Connector(lambda: self, *self.server) + start = perf_counter() await asyncio.wait_for(connector.create_connection(), timeout=timeout) + self.connection_latency = perf_counter() - start async def handle_request(self, request): controller = self.network.subscription_controllers[request.method] @@ -101,6 +104,7 @@ class ClientSession(BaseClientSession): log.debug("Connection lost: %s:%d", *self.server) super().connection_lost(exc) self.response_time = None + self.connection_latency = None self._response_samples = 0 self._on_disconnect_controller.add(True) From 90457375045ee976b64ab19aaa7a75750a021af9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 14:52:49 -0400 Subject: [PATCH 09/16] track response time for server.version only --- torba/torba/client/basenetwork.py | 45 ++++++++++++++++--------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 01f6f906f..b6190b973 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -1,7 +1,7 @@ import logging import asyncio from operator import itemgetter -from typing import Dict, Optional +from typing import Dict, Optional, Tuple from time import time, perf_counter from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -13,7 +13,6 @@ log = logging.getLogger(__name__) class ClientSession(BaseClientSession): - def __init__(self, *args, network, server, timeout=30, on_connect_callback=None, **kwargs): self.network = network self.server = server @@ -35,30 +34,32 @@ class ClientSession(BaseClientSession): def available(self): return not self.is_closing() and self._can_send.is_set() and self.response_time is not None + @property + def server_address_and_port(self) -> Optional[Tuple[str, int]]: + if self.transport: + return self.transport.get_extra_info('peername') + + async def send_timed_server_version_request(self, args=()): + start = perf_counter() + result = await asyncio.wait_for( + super().send_request('server.version', args), timeout=self.timeout + ) + current_response_time = perf_counter() - start + response_sum = self.response_time * self._response_samples + current_response_time + self.response_time = response_sum / (self._response_samples + 1) + self._response_samples += 1 + return result + async def send_request(self, method, args=()): + if method == 'server.version': + return await self.send_timed_server_version_request(args) try: - self.pending_amount += 1 - start = perf_counter() - result = await asyncio.wait_for( + return await asyncio.wait_for( super().send_request(method, args), timeout=self.timeout ) - current_response_time = perf_counter() - start - if self.response_time: - self.response_time = (self.response_time * self._response_samples - + current_response_time) / (self._response_samples + 1) - else: - self.response_time = current_response_time - self._response_samples += 1 - return result except RPCError as e: - log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) + log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) raise e - except TimeoutError: - self.response_time = None - self._response_samples = 0 - raise - finally: - self.pending_amount -= 1 async def ensure_session(self): # Handles reconnecting and maintaining a session alive @@ -70,8 +71,8 @@ class ClientSession(BaseClientSession): await self.create_connection(self.timeout) await self.ensure_server_version() self._on_connect_cb() - if (time() - self.last_send) > self.max_seconds_idle or self.response_time is None: - await self.send_request('server.banner') + if (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None: + await self.ensure_server_version() retry_delay = default_delay except (asyncio.TimeoutError, OSError): await self.close() From 1bef56f030ce78abad4c532d6c1df999ec7c3934 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 14:55:24 -0400 Subject: [PATCH 10/16] disconnect from duplicate servers (same ip, different dns) --- torba/torba/client/basenetwork.py | 50 ++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index b6190b973..b7293ec37 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -239,14 +239,45 @@ class SessionPool: key=itemgetter(0) )[1] + def _get_session_connect_callback(self, session: ClientSession): + loop = asyncio.get_event_loop() + + def callback(): + duplicate_connections = [ + s for s in self.sessions + if s is not session and s.server_address_and_port == session.server_address_and_port + ] + already_connected = None if not duplicate_connections else duplicate_connections[0] + if already_connected: + self.sessions.pop(session).cancel() + session.synchronous_close() + log.info("wallet server %s resolves to the same server as %s, rechecking in an hour", + session.server[0], already_connected.server[0]) + loop.call_later(3600, self._connect_session, session.server) + return + self.new_connection_event.set() + log.info("connected to %s:%i", *session.server) + + return callback + + def _connect_session(self, server: Tuple[str, int]): + session = None + for s in self.sessions: + if s.server == server: + session = s + break + if not session: + session = ClientSession( + network=self.network, server=server + ) + session._on_connect_cb = self._get_session_connect_callback(session) + if session not in self.sessions or not self.sessions[session] or self.sessions[session].done(): + self.sessions[session] = asyncio.create_task(session.ensure_session()) + self.sessions[session].add_done_callback(lambda _: self.ensure_connections()) + def start(self, default_servers): - callback = self.new_connection_event.set - self.sessions = { - ClientSession( - network=self.network, server=server, on_connect_callback=callback - ): None for server in default_servers - } - self.ensure_connections() + for server in default_servers: + self._connect_session(server) def stop(self): for task in self.sessions.values(): @@ -255,10 +286,7 @@ class SessionPool: def ensure_connections(self): for session, task in list(self.sessions.items()): - if not task or task.done(): - task = asyncio.create_task(session.ensure_session()) - task.add_done_callback(lambda _: self.ensure_connections()) - self.sessions[session] = task + self._connect_session(session.server) def trigger_nodelay_connect(self): # used when other parts of the system sees we might have internet back From a5af3866fda908b241f0ce3c4f1b2a9c9e53c9c7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 14:56:13 -0400 Subject: [PATCH 11/16] time.time -> time.perf_counter --- torba/torba/rpc/session.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index 7b0350b15..dd9909cfd 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -103,7 +103,7 @@ class SessionBase(asyncio.Protocol): # Force-close a connection if a send doesn't succeed in this time self.max_send_delay = 60 # Statistics. The RPC object also keeps its own statistics. - self.start_time = time.time() + self.start_time = time.perf_counter() self.errors = 0 self.send_count = 0 self.send_size = 0 @@ -123,7 +123,7 @@ class SessionBase(asyncio.Protocol): # A non-positive value means not to limit concurrency if self.bw_limit <= 0: return - now = time.time() + now = time.perf_counter() # Reduce the recorded usage in proportion to the elapsed time refund = (now - self.bw_time) * (self.bw_limit / 3600) self.bw_charge = max(0, self.bw_charge - int(refund)) @@ -156,7 +156,7 @@ class SessionBase(asyncio.Protocol): self.send_size += len(framed_message) self._using_bandwidth(len(framed_message)) self.send_count += 1 - self.last_send = time.time() + self.last_send = time.perf_counter() if self.verbosity >= 4: self.logger.debug(f'Sending framed message {framed_message}') self.transport.write(framed_message) @@ -303,7 +303,7 @@ class MessageSession(SessionBase): ) self._bump_errors() else: - self.last_recv = time.time() + self.last_recv = time.perf_counter() self.recv_count += 1 if self.recv_count % 10 == 0: await self._update_concurrency() @@ -423,7 +423,7 @@ class RPCSession(SessionBase): self.logger.warning(f'{e!r}') continue - self.last_recv = time.time() + self.last_recv = time.perf_counter() self.recv_count += 1 if self.recv_count % 10 == 0: await self._update_concurrency() From c590954eb981808b66a8c1d01a3e95cb5b847cd6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 15:11:01 -0400 Subject: [PATCH 12/16] include connection latency in server selection --- torba/torba/client/basenetwork.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index b7293ec37..a75ff7eb1 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -40,26 +40,30 @@ class ClientSession(BaseClientSession): return self.transport.get_extra_info('peername') async def send_timed_server_version_request(self, args=()): + log.debug("send version request to %s:%i", *self.server) start = perf_counter() result = await asyncio.wait_for( super().send_request('server.version', args), timeout=self.timeout ) current_response_time = perf_counter() - start - response_sum = self.response_time * self._response_samples + current_response_time + response_sum = (self.response_time or 0) * self._response_samples + current_response_time self.response_time = response_sum / (self._response_samples + 1) self._response_samples += 1 return result async def send_request(self, method, args=()): - if method == 'server.version': - return await self.send_timed_server_version_request(args) + self.pending_amount += 1 try: + if method == 'server.version': + return await self.send_timed_server_version_request(args) return await asyncio.wait_for( super().send_request(method, args), timeout=self.timeout ) except RPCError as e: log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) raise e + finally: + self.pending_amount -= 1 async def ensure_session(self): # Handles reconnecting and maintaining a session alive @@ -107,6 +111,7 @@ class ClientSession(BaseClientSession): self.response_time = None self.connection_latency = None self._response_samples = 0 + self.pending_amount = 0 self._on_disconnect_controller.add(True) @@ -234,7 +239,7 @@ class SessionPool: if not self.available_sessions: return None return min( - [(session.response_time * session.pending_amount, session) + [((session.response_time + session.connection_latency) * (session.pending_amount + 1), session) for session in self.available_sessions], key=itemgetter(0) )[1] From 89e395b5f4fe8edec32fd2f7b50eab547fe7cf85 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 15:26:00 -0400 Subject: [PATCH 13/16] pylint --- torba/torba/client/basenetwork.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index a75ff7eb1..5049795e4 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -2,7 +2,7 @@ import logging import asyncio from operator import itemgetter from typing import Dict, Optional, Tuple -from time import time, perf_counter +from time import perf_counter from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -60,7 +60,8 @@ class ClientSession(BaseClientSession): super().send_request(method, args), timeout=self.timeout ) except RPCError as e: - log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) + log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", + *self.server, *e.args) raise e finally: self.pending_amount -= 1 @@ -257,7 +258,7 @@ class SessionPool: self.sessions.pop(session).cancel() session.synchronous_close() log.info("wallet server %s resolves to the same server as %s, rechecking in an hour", - session.server[0], already_connected.server[0]) + session.server[0], already_connected.server[0]) loop.call_later(3600, self._connect_session, session.server) return self.new_connection_event.set() @@ -290,7 +291,7 @@ class SessionPool: self.sessions.clear() def ensure_connections(self): - for session, task in list(self.sessions.items()): + for session in self.sessions: self._connect_session(session.server) def trigger_nodelay_connect(self): From b82788102483f8a7506367336352790f5fc126c0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 15:35:06 -0400 Subject: [PATCH 14/16] mypy --- torba/torba/client/basenetwork.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 5049795e4..68d534e1d 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -36,8 +36,9 @@ class ClientSession(BaseClientSession): @property def server_address_and_port(self) -> Optional[Tuple[str, int]]: - if self.transport: - return self.transport.get_extra_info('peername') + if not self.transport: + return + return self.transport.get_extra_info('peername') async def send_timed_server_version_request(self, args=()): log.debug("send version request to %s:%i", *self.server) @@ -278,8 +279,9 @@ class SessionPool: ) session._on_connect_cb = self._get_session_connect_callback(session) if session not in self.sessions or not self.sessions[session] or self.sessions[session].done(): - self.sessions[session] = asyncio.create_task(session.ensure_session()) - self.sessions[session].add_done_callback(lambda _: self.ensure_connections()) + task = asyncio.create_task(session.ensure_session()) + task.add_done_callback(lambda _: self.ensure_connections()) + self.sessions[session] = task def start(self, default_servers): for server in default_servers: From f6ba09b1715fcfbb203ac2f8cba3a43e145b3453 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 15:41:27 -0400 Subject: [PATCH 15/16] mypy --- torba/torba/client/basenetwork.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 68d534e1d..572d242a7 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -37,7 +37,7 @@ class ClientSession(BaseClientSession): @property def server_address_and_port(self) -> Optional[Tuple[str, int]]: if not self.transport: - return + return None return self.transport.get_extra_info('peername') async def send_timed_server_version_request(self, args=()): @@ -278,7 +278,8 @@ class SessionPool: network=self.network, server=server ) session._on_connect_cb = self._get_session_connect_callback(session) - if session not in self.sessions or not self.sessions[session] or self.sessions[session].done(): + task = self.sessions.get(session, None) + if not task or task.done(): task = asyncio.create_task(session.ensure_session()) task.add_done_callback(lambda _: self.ensure_connections()) self.sessions[session] = task From c4e0c659a7b4af0d2298594f3c6fbfad7cf347da Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Aug 2019 19:22:22 -0400 Subject: [PATCH 16/16] fix missing perf_counter --- torba/torba/server/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torba/torba/server/session.py b/torba/torba/server/session.py index 9c5981d22..db0e6fa85 100644 --- a/torba/torba/server/session.py +++ b/torba/torba/server/session.py @@ -258,7 +258,7 @@ class SessionManager: session_timeout = self.env.session_timeout while True: await sleep(session_timeout // 10) - stale_cutoff = time.time() - session_timeout + stale_cutoff = time.perf_counter() - session_timeout stale_sessions = [session for session in self.sessions if session.last_recv < stale_cutoff] if stale_sessions: