diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py index 618d71b57..041a072fd 100644 --- a/lbry/tests/integration/test_wallet_server_sessions.py +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -16,11 +16,12 @@ class TestSessionBloat(IntegrationTestCase): LEDGER = lbry.wallet async def test_session_bloat_from_socket_timeout(self): + client = self.ledger.network.client await self.conductor.stop_spv() await self.ledger.stop() self.conductor.spv_node.session_timeout = 1 await self.conductor.start_spv() - session = ClientSession(network=None, server=self.ledger.network.client.server, timeout=0.2) + session = ClientSession(network=None, server=client.server, timeout=0.2) await session.create_connection() await session.send_request('server.banner', ()) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index 8d0faed2a..75a28a28a 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -3,6 +3,7 @@ import asyncio from unittest.mock import Mock from torba.client.basenetwork import BaseNetwork +from torba.orchstr8.node import SPVNode from torba.rpc import RPCSession from torba.testcase import IntegrationTestCase, AsyncioTestCase @@ -20,6 +21,26 @@ class ReconnectTests(IntegrationTestCase): VERBOSITY = logging.WARN + async def test_multiple_servers(self): + # we have a secondary node that connects later, so + node2 = SPVNode(self.conductor.spv_module, node_number=2) + self.ledger.network.config['default_servers'].append((node2.hostname, node2.port)) + await asyncio.wait_for(self.ledger.stop(), timeout=1) + await asyncio.wait_for(self.ledger.start(), timeout=1) + self.ledger.network.session_pool.new_connection_event.clear() + await node2.start(self.blockchain) + # this is only to speed up the test as retrying would take 4+ seconds + for session in self.ledger.network.session_pool.sessions: + session.trigger_urgent_reconnect.set() + await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1) + self.assertEqual(2, len(self.ledger.network.session_pool.available_sessions)) + self.assertTrue(self.ledger.network.is_connected) + switch_event = self.ledger.network.on_connected.first + await node2.stop(True) + # secondary down, but primary is ok, do not switch! (switches trigger new on_connected events) + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(switch_event, timeout=1) + async def test_connection_drop_still_receives_events_after_reconnected(self): address1 = await self.account.receiving.get_or_create_usable_address() # disconnect and send a new tx, should reconnect and get it @@ -35,10 +56,11 @@ class ReconnectTests(IntegrationTestCase): # is it real? are we rich!? let me see this tx... d = self.ledger.network.get_transaction(sendtxid) # what's that smoke on my ethernet cable? oh no! + master_client = self.ledger.network.client self.ledger.network.client.connection_lost(Exception()) with self.assertRaises(asyncio.TimeoutError): await d - self.assertIsNone(self.ledger.network.client.response_time) # response time unknown as it failed + self.assertIsNone(master_client.response_time) # response time unknown as it failed # rich but offline? no way, no water, let's retry with self.assertRaisesRegex(ConnectionError, 'connection is not available'): await self.ledger.network.get_transaction(sendtxid) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 1d6883fd6..19dbd58b7 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -144,6 +144,7 @@ class BaseNetwork: self.config = ledger.config self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6)) self.client: Optional[ClientSession] = None + self._switch_task: Optional[asyncio.Task] = None self.running = False self.remote_height: int = 0 @@ -161,42 +162,34 @@ class BaseNetwork: 'blockchain.address.subscribe': self._on_status_controller, } - async def switch_to_fastest(self): - try: - client = await asyncio.wait_for(self.session_pool.wait_for_fastest_session(), 30) - except asyncio.TimeoutError: - if self.client: - await self.client.close() - self.client = None - for session in self.session_pool.sessions: - session.synchronous_close() - log.warning("not connected to any wallet servers") - return - current_client = self.client - self.client = client - log.info("Switching to SPV wallet server: %s:%d", *self.client.server) - self._on_connected_controller.add(True) - try: - self._update_remote_height((await self.subscribe_headers(),)) - log.info("Subscribed to headers: %s:%d", *self.client.server) - except asyncio.TimeoutError: - if self.client: - await self.client.close() - self.client = current_client - return - self.session_pool.new_connection_event.clear() - return await self.session_pool.new_connection_event.wait() + async def switch_forever(self): + while self.running: + if self.is_connected: + await self.client.on_disconnected.first + self.client = None + continue + self.client = await self.session_pool.wait_for_fastest_session() + log.info("Switching to SPV wallet server: %s:%d", *self.client.server) + self._on_connected_controller.add(True) + try: + self._update_remote_height((await self.subscribe_headers(),)) + log.info("Subscribed to headers: %s:%d", *self.client.server) + except asyncio.TimeoutError: + log.info("Switching to %s:%d timed out, closing and retrying.") + self.client.synchronous_close() + self.client = None async def start(self): self.running = True + self._switch_task = asyncio.ensure_future(self.switch_forever()) self.session_pool.start(self.config['default_servers']) self.on_header.listen(self._update_remote_height) - while self.running: - await self.switch_to_fastest() async def stop(self): - self.running = False - self.session_pool.stop() + if self.running: + self.running = False + self._switch_task.cancel() + self.session_pool.stop() @property def is_connected(self): @@ -329,8 +322,9 @@ class SessionPool: self._connect_session(server) def stop(self): - for task in self.sessions.values(): + for session, task in self.sessions.items(): task.cancel() + session.synchronous_close() self.sessions.clear() def ensure_connections(self): diff --git a/torba/torba/orchstr8/node.py b/torba/torba/orchstr8/node.py index b89899684..453016bfa 100644 --- a/torba/torba/orchstr8/node.py +++ b/torba/torba/orchstr8/node.py @@ -190,14 +190,15 @@ class WalletNode: class SPVNode: - def __init__(self, coin_class): + def __init__(self, coin_class, node_number=1): self.coin_class = coin_class self.controller = None self.data_path = None self.server = None self.hostname = 'localhost' - self.port = 50001 + 1 # avoid conflict with default daemon + self.port = 50001 + node_number # avoid conflict with default daemon self.session_timeout = 600 + self.rpc_port = '0' # disabled by default async def start(self, blockchain_node: 'BlockchainNode'): self.data_path = tempfile.mkdtemp() @@ -210,6 +211,7 @@ class SPVNode: 'SESSION_TIMEOUT': str(self.session_timeout), 'MAX_QUERY_WORKERS': '0', 'INDIVIDUAL_TAG_INDEXES': '', + 'RPC_PORT': self.rpc_port } # TODO: don't use os.environ os.environ.update(conf)