lbry-sdk/torba/tests/client_tests/integration/test_network.py
2019-09-18 19:28:11 -04:00

158 lines
7.8 KiB
Python

import logging
import os
import asyncio
from unittest.mock import Mock
from torba.client.basenetwork import BaseNetwork
from torba.orchstr8.node import SPVNode
from torba.rpc import RPCSession
from torba.testcase import IntegrationTestCase, AsyncioTestCase
class NetworkTests(IntegrationTestCase):
async def test_remote_height_updated_automagically(self):
initial_height = self.ledger.network.remote_height
await self.blockchain.generate(1)
await self.ledger.network.on_header.first
self.assertEqual(self.ledger.network.remote_height, initial_height + 1)
async def test_server_features(self):
self.assertEqual({'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH,
'hash_function': 'sha256',
'hosts': {},
'protocol_max': '1.4',
'protocol_min': '1.1',
'pruning': None,
'description': '',
'payment_address': '',
'daily_fee': 0,
'server_version': '0.5.7'}, await self.ledger.network.get_server_features())
await self.conductor.spv_node.stop()
address = (await self.account.get_addresses(limit=1))[0]
os.environ.update({'DESCRIPTION': 'Fastest server in the west.', 'DONATION_ADDRESS': address, 'DAILY_FEE': '42'})
await self.conductor.spv_node.start(self.conductor.blockchain_node)
await self.ledger.network.on_connected.first
self.assertEqual({'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH,
'hash_function': 'sha256',
'hosts': {},
'protocol_max': '1.4',
'protocol_min': '1.1',
'pruning': None,
'description': 'Fastest server in the west.',
'payment_address': address,
'daily_fee': 42,
'server_version': '0.5.7'}, await self.ledger.network.get_server_features())
class ReconnectTests(IntegrationTestCase):
VERBOSITY = logging.WARN
async def test_multiple_servers(self):
# we have a secondary node that connects later, so
node2 = SPVNode(self.conductor.spv_module, node_number=2)
self.ledger.network.config['default_servers'].append((node2.hostname, node2.port))
await asyncio.wait_for(self.ledger.stop(), timeout=1)
await asyncio.wait_for(self.ledger.start(), timeout=1)
self.ledger.network.session_pool.new_connection_event.clear()
await node2.start(self.blockchain)
# this is only to speed up the test as retrying would take 4+ seconds
for session in self.ledger.network.session_pool.sessions:
session.trigger_urgent_reconnect.set()
await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1)
self.assertEqual(2, len(list(self.ledger.network.session_pool.available_sessions)))
self.assertTrue(self.ledger.network.is_connected)
switch_event = self.ledger.network.on_connected.first
await node2.stop(True)
# secondary down, but primary is ok, do not switch! (switches trigger new on_connected events)
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(switch_event, timeout=1)
async def test_connection_drop_still_receives_events_after_reconnected(self):
address1 = await self.account.receiving.get_or_create_usable_address()
# disconnect and send a new tx, should reconnect and get it
self.ledger.network.client.connection_lost(Exception())
self.assertFalse(self.ledger.network.is_connected)
sendtxid = await self.blockchain.send_to_address(address1, 1.1337)
await asyncio.wait_for(self.on_transaction_id(sendtxid), 1.0) # mempool
await self.blockchain.generate(1)
await self.on_transaction_id(sendtxid) # confirmed
self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine
await self.assertBalance(self.account, '1.1337')
# is it real? are we rich!? let me see this tx...
d = self.ledger.network.get_transaction(sendtxid)
# what's that smoke on my ethernet cable? oh no!
master_client = self.ledger.network.client
self.ledger.network.client.connection_lost(Exception())
with self.assertRaises(asyncio.TimeoutError):
await d
self.assertIsNone(master_client.response_time) # response time unknown as it failed
# rich but offline? no way, no water, let's retry
with self.assertRaisesRegex(ConnectionError, 'connection is not available'):
await self.ledger.network.get_transaction(sendtxid)
# * goes to pick some water outside... * time passes by and another donation comes in
sendtxid = await self.blockchain.send_to_address(address1, 42)
await self.blockchain.generate(1)
# (this is just so the test doesnt hang forever if it doesnt reconnect)
if not self.ledger.network.is_connected:
await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=1.0)
# omg, the burned cable still works! torba is fire proof!
await self.ledger.network.get_transaction(sendtxid)
async def test_timeout_then_reconnect(self):
# tests that it connects back after some failed attempts
await self.conductor.spv_node.stop()
self.assertFalse(self.ledger.network.is_connected)
await asyncio.sleep(0.2) # let it retry and fail once
await self.conductor.spv_node.start(self.conductor.blockchain_node)
await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected)
class ServerPickingTestCase(AsyncioTestCase):
async def _make_fake_server(self, latency=1.0, port=1):
# local fake server with artificial latency
class FakeSession(RPCSession):
async def handle_request(self, request):
await asyncio.sleep(latency)
return {"height": 1}
server = await self.loop.create_server(lambda: FakeSession(), host='127.0.0.1', port=port)
self.addCleanup(server.close)
return '127.0.0.1', port
async def _make_bad_server(self, port=42420):
async def echo(reader, writer):
while True:
writer.write(await reader.read())
server = await asyncio.start_server(echo, host='127.0.0.1', port=port)
self.addCleanup(server.close)
return '127.0.0.1', port
async def test_pick_fastest(self):
ledger = Mock(config={
'default_servers': [
# fast but unhealthy, should be discarded
await self._make_bad_server(),
('localhost', 1),
('example.that.doesnt.resolve', 9000),
await self._make_fake_server(latency=1.0, port=1340),
await self._make_fake_server(latency=0.1, port=1337),
await self._make_fake_server(latency=0.4, port=1339),
],
'connect_timeout': 3
})
network = BaseNetwork(ledger)
self.addCleanup(network.stop)
asyncio.ensure_future(network.start())
await asyncio.wait_for(network.on_connected.first, timeout=1)
self.assertTrue(network.is_connected)
self.assertEqual(network.client.server, ('127.0.0.1', 1337))
self.assertTrue(all([not session.is_closing() for session in network.session_pool.available_sessions]))
# ensure we are connected to all of them after a while
await asyncio.sleep(1)
self.assertEqual(len(list(network.session_pool.available_sessions)), 3)