import logging import asyncio import lbry from unittest.mock import Mock from lbry.wallet.network import Network from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.rpc import RPCSession from lbry.testcase import IntegrationTestCase, AsyncioTestCase class NetworkTests(IntegrationTestCase): async def test_remote_height_updated_automagically(self): initial_height = self.ledger.network.remote_height await self.blockchain.generate(1) await self.ledger.network.on_header.first self.assertEqual(self.ledger.network.remote_height, initial_height + 1) async def test_server_features(self): self.assertDictEqual({ 'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH, 'hash_function': 'sha256', 'hosts': {}, 'protocol_max': '0.99.0', 'protocol_min': '0.54.0', 'pruning': None, 'description': '', 'payment_address': '', 'donation_address': '', 'daily_fee': '0', 'server_version': lbry.__version__, 'trending_algorithm': 'zscore', }, await self.ledger.network.get_server_features()) await self.conductor.spv_node.stop() payment_address, donation_address = await self.account.get_addresses(limit=2) await self.conductor.spv_node.start( self.conductor.blockchain_node, extraconf={ 'DESCRIPTION': 'Fastest server in the west.', 'PAYMENT_ADDRESS': payment_address, 'DONATION_ADDRESS': donation_address, 'DAILY_FEE': '42' } ) await self.ledger.network.on_connected.first self.assertDictEqual({ 'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH, 'hash_function': 'sha256', 'hosts': {}, 'protocol_max': '0.99.0', 'protocol_min': '0.54.0', 'pruning': None, 'description': 'Fastest server in the west.', 'payment_address': payment_address, 'donation_address': donation_address, 'daily_fee': '42', 'server_version': lbry.__version__, 'trending_algorithm': 'zscore', }, await self.ledger.network.get_server_features()) class ReconnectTests(IntegrationTestCase): async def test_multiple_servers(self): # we have a secondary node that connects later, so node2 = SPVNode(self.conductor.spv_module, node_number=2) self.ledger.network.config['default_servers'].append((node2.hostname, node2.port)) await asyncio.wait_for(self.ledger.stop(), timeout=1) await asyncio.wait_for(self.ledger.start(), timeout=1) self.ledger.network.session_pool.new_connection_event.clear() await node2.start(self.blockchain) # this is only to speed up the test as retrying would take 4+ seconds for session in self.ledger.network.session_pool.sessions: session.trigger_urgent_reconnect.set() await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1) self.assertEqual(2, len(list(self.ledger.network.session_pool.available_sessions))) self.assertTrue(self.ledger.network.is_connected) switch_event = self.ledger.network.on_connected.first await node2.stop(True) # secondary down, but primary is ok, do not switch! (switches trigger new on_connected events) with self.assertRaises(asyncio.TimeoutError): await asyncio.wait_for(switch_event, timeout=1) async def test_direct_sync(self): await self.ledger.stop() initial_height = self.ledger.local_height_including_downloaded_height await self.blockchain.generate(100) while self.conductor.spv_node.server.session_mgr.notified_height < initial_height + 99: # off by 1 await asyncio.sleep(0.1) self.assertEqual(initial_height, self.ledger.local_height_including_downloaded_height) # locks header processing so we make sure we are the only ones modifying it async with self.ledger._header_processing_lock: await self.ledger.headers.open() await self.ledger.network.start() await self.ledger.network.on_connected.first await self.ledger.initial_headers_sync() self.assertEqual(initial_height + 100, self.ledger.local_height_including_downloaded_height) async def test_connection_drop_still_receives_events_after_reconnected(self): address1 = await self.account.receiving.get_or_create_usable_address() # disconnect and send a new tx, should reconnect and get it self.ledger.network.client.connection_lost(Exception()) self.assertFalse(self.ledger.network.is_connected) sendtxid = await self.blockchain.send_to_address(address1, 1.1337) await asyncio.wait_for(self.on_transaction_id(sendtxid), 1.0) # mempool await self.blockchain.generate(1) await self.on_transaction_id(sendtxid) # confirmed self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine await self.assertBalance(self.account, '1.1337') # is it real? are we rich!? let me see this tx... d = self.ledger.network.get_transaction(sendtxid) # what's that smoke on my ethernet cable? oh no! master_client = self.ledger.network.client self.ledger.network.client.connection_lost(Exception()) with self.assertRaises(asyncio.TimeoutError): await d self.assertIsNone(master_client.response_time) # response time unknown as it failed # rich but offline? no way, no water, let's retry with self.assertRaisesRegex(ConnectionError, 'connection is not available'): await self.ledger.network.get_transaction(sendtxid) # * goes to pick some water outside... * time passes by and another donation comes in sendtxid = await self.blockchain.send_to_address(address1, 42) await self.blockchain.generate(1) # (this is just so the test doesn't hang forever if it doesn't reconnect) if not self.ledger.network.is_connected: await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=1.0) # omg, the burned cable still works! torba is fire proof! await self.ledger.network.get_transaction(sendtxid) async def test_timeout_then_reconnect(self): # tests that it connects back after some failed attempts await self.conductor.spv_node.stop() self.assertFalse(self.ledger.network.is_connected) await asyncio.sleep(0.2) # let it retry and fail once await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.ledger.network.on_connected.first self.assertTrue(self.ledger.network.is_connected) async def test_online_but_still_unavailable(self): # Edge case. See issue #2445 for context self.assertIsNotNone(self.ledger.network.session_pool.fastest_session) for session in self.ledger.network.session_pool.sessions: session.response_time = None self.assertIsNone(self.ledger.network.session_pool.fastest_session) class ServerPickingTestCase(AsyncioTestCase): async def _make_fake_server(self, latency=1.0, port=1): # local fake server with artificial latency class FakeSession(RPCSession): async def handle_request(self, request): await asyncio.sleep(latency) if request.method == 'server.version': return tuple(request.args) return {'height': 1} server = await self.loop.create_server(lambda: FakeSession(), host='127.0.0.1', port=port) self.addCleanup(server.close) return '127.0.0.1', port async def _make_bad_server(self, port=42420): async def echo(reader, writer): while True: writer.write(await reader.read()) server = await asyncio.start_server(echo, host='127.0.0.1', port=port) self.addCleanup(server.close) return '127.0.0.1', port async def test_pick_fastest(self): ledger = Mock(config={ 'default_servers': [ # fast but unhealthy, should be discarded await self._make_bad_server(), ('localhost', 1), ('example.that.doesnt.resolve', 9000), await self._make_fake_server(latency=1.0, port=1340), await self._make_fake_server(latency=0.1, port=1337), await self._make_fake_server(latency=0.4, port=1339), ], 'connect_timeout': 3 }) network = Network(ledger) self.addCleanup(network.stop) asyncio.ensure_future(network.start()) await asyncio.wait_for(network.on_connected.first, timeout=1) self.assertTrue(network.is_connected) self.assertTupleEqual(network.client.server, ('127.0.0.1', 1337)) self.assertTrue(all([not session.is_closing() for session in network.session_pool.available_sessions])) # ensure we are connected to all of them after a while await asyncio.sleep(1) self.assertEqual(len(list(network.session_pool.available_sessions)), 3)