switch tests and fixes + support for multiple servers

This commit is contained in:
Victor Shyba 2019-08-29 01:12:49 -03:00
parent 9df659b647
commit 96800de052
4 changed files with 53 additions and 34 deletions

View file

@ -16,11 +16,12 @@ class TestSessionBloat(IntegrationTestCase):
LEDGER = lbry.wallet LEDGER = lbry.wallet
async def test_session_bloat_from_socket_timeout(self): async def test_session_bloat_from_socket_timeout(self):
client = self.ledger.network.client
await self.conductor.stop_spv() await self.conductor.stop_spv()
await self.ledger.stop() await self.ledger.stop()
self.conductor.spv_node.session_timeout = 1 self.conductor.spv_node.session_timeout = 1
await self.conductor.start_spv() await self.conductor.start_spv()
session = ClientSession(network=None, server=self.ledger.network.client.server, timeout=0.2) session = ClientSession(network=None, server=client.server, timeout=0.2)
await session.create_connection() await session.create_connection()
await session.send_request('server.banner', ()) await session.send_request('server.banner', ())
self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1)

View file

@ -3,6 +3,7 @@ import asyncio
from unittest.mock import Mock from unittest.mock import Mock
from torba.client.basenetwork import BaseNetwork from torba.client.basenetwork import BaseNetwork
from torba.orchstr8.node import SPVNode
from torba.rpc import RPCSession from torba.rpc import RPCSession
from torba.testcase import IntegrationTestCase, AsyncioTestCase from torba.testcase import IntegrationTestCase, AsyncioTestCase
@ -20,6 +21,26 @@ class ReconnectTests(IntegrationTestCase):
VERBOSITY = logging.WARN VERBOSITY = logging.WARN
async def test_multiple_servers(self):
# we have a secondary node that connects later, so
node2 = SPVNode(self.conductor.spv_module, node_number=2)
self.ledger.network.config['default_servers'].append((node2.hostname, node2.port))
await asyncio.wait_for(self.ledger.stop(), timeout=1)
await asyncio.wait_for(self.ledger.start(), timeout=1)
self.ledger.network.session_pool.new_connection_event.clear()
await node2.start(self.blockchain)
# this is only to speed up the test as retrying would take 4+ seconds
for session in self.ledger.network.session_pool.sessions:
session.trigger_urgent_reconnect.set()
await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1)
self.assertEqual(2, len(self.ledger.network.session_pool.available_sessions))
self.assertTrue(self.ledger.network.is_connected)
switch_event = self.ledger.network.on_connected.first
await node2.stop(True)
# secondary down, but primary is ok, do not switch! (switches trigger new on_connected events)
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(switch_event, timeout=1)
async def test_connection_drop_still_receives_events_after_reconnected(self): async def test_connection_drop_still_receives_events_after_reconnected(self):
address1 = await self.account.receiving.get_or_create_usable_address() address1 = await self.account.receiving.get_or_create_usable_address()
# disconnect and send a new tx, should reconnect and get it # disconnect and send a new tx, should reconnect and get it
@ -35,10 +56,11 @@ class ReconnectTests(IntegrationTestCase):
# is it real? are we rich!? let me see this tx... # is it real? are we rich!? let me see this tx...
d = self.ledger.network.get_transaction(sendtxid) d = self.ledger.network.get_transaction(sendtxid)
# what's that smoke on my ethernet cable? oh no! # what's that smoke on my ethernet cable? oh no!
master_client = self.ledger.network.client
self.ledger.network.client.connection_lost(Exception()) self.ledger.network.client.connection_lost(Exception())
with self.assertRaises(asyncio.TimeoutError): with self.assertRaises(asyncio.TimeoutError):
await d await d
self.assertIsNone(self.ledger.network.client.response_time) # response time unknown as it failed self.assertIsNone(master_client.response_time) # response time unknown as it failed
# rich but offline? no way, no water, let's retry # rich but offline? no way, no water, let's retry
with self.assertRaisesRegex(ConnectionError, 'connection is not available'): with self.assertRaisesRegex(ConnectionError, 'connection is not available'):
await self.ledger.network.get_transaction(sendtxid) await self.ledger.network.get_transaction(sendtxid)

View file

@ -144,6 +144,7 @@ class BaseNetwork:
self.config = ledger.config self.config = ledger.config
self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6)) self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6))
self.client: Optional[ClientSession] = None self.client: Optional[ClientSession] = None
self._switch_task: Optional[asyncio.Task] = None
self.running = False self.running = False
self.remote_height: int = 0 self.remote_height: int = 0
@ -161,42 +162,34 @@ class BaseNetwork:
'blockchain.address.subscribe': self._on_status_controller, 'blockchain.address.subscribe': self._on_status_controller,
} }
async def switch_to_fastest(self): async def switch_forever(self):
try: while self.running:
client = await asyncio.wait_for(self.session_pool.wait_for_fastest_session(), 30) if self.is_connected:
except asyncio.TimeoutError: await self.client.on_disconnected.first
if self.client: self.client = None
await self.client.close() continue
self.client = None self.client = await self.session_pool.wait_for_fastest_session()
for session in self.session_pool.sessions: log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
session.synchronous_close() self._on_connected_controller.add(True)
log.warning("not connected to any wallet servers") try:
return self._update_remote_height((await self.subscribe_headers(),))
current_client = self.client log.info("Subscribed to headers: %s:%d", *self.client.server)
self.client = client except asyncio.TimeoutError:
log.info("Switching to SPV wallet server: %s:%d", *self.client.server) log.info("Switching to %s:%d timed out, closing and retrying.")
self._on_connected_controller.add(True) self.client.synchronous_close()
try: self.client = None
self._update_remote_height((await self.subscribe_headers(),))
log.info("Subscribed to headers: %s:%d", *self.client.server)
except asyncio.TimeoutError:
if self.client:
await self.client.close()
self.client = current_client
return
self.session_pool.new_connection_event.clear()
return await self.session_pool.new_connection_event.wait()
async def start(self): async def start(self):
self.running = True self.running = True
self._switch_task = asyncio.ensure_future(self.switch_forever())
self.session_pool.start(self.config['default_servers']) self.session_pool.start(self.config['default_servers'])
self.on_header.listen(self._update_remote_height) self.on_header.listen(self._update_remote_height)
while self.running:
await self.switch_to_fastest()
async def stop(self): async def stop(self):
self.running = False if self.running:
self.session_pool.stop() self.running = False
self._switch_task.cancel()
self.session_pool.stop()
@property @property
def is_connected(self): def is_connected(self):
@ -329,8 +322,9 @@ class SessionPool:
self._connect_session(server) self._connect_session(server)
def stop(self): def stop(self):
for task in self.sessions.values(): for session, task in self.sessions.items():
task.cancel() task.cancel()
session.synchronous_close()
self.sessions.clear() self.sessions.clear()
def ensure_connections(self): def ensure_connections(self):

View file

@ -190,14 +190,15 @@ class WalletNode:
class SPVNode: class SPVNode:
def __init__(self, coin_class): def __init__(self, coin_class, node_number=1):
self.coin_class = coin_class self.coin_class = coin_class
self.controller = None self.controller = None
self.data_path = None self.data_path = None
self.server = None self.server = None
self.hostname = 'localhost' self.hostname = 'localhost'
self.port = 50001 + 1 # avoid conflict with default daemon self.port = 50001 + node_number # avoid conflict with default daemon
self.session_timeout = 600 self.session_timeout = 600
self.rpc_port = '0' # disabled by default
async def start(self, blockchain_node: 'BlockchainNode'): async def start(self, blockchain_node: 'BlockchainNode'):
self.data_path = tempfile.mkdtemp() self.data_path = tempfile.mkdtemp()
@ -210,6 +211,7 @@ class SPVNode:
'SESSION_TIMEOUT': str(self.session_timeout), 'SESSION_TIMEOUT': str(self.session_timeout),
'MAX_QUERY_WORKERS': '0', 'MAX_QUERY_WORKERS': '0',
'INDIVIDUAL_TAG_INDEXES': '', 'INDIVIDUAL_TAG_INDEXES': '',
'RPC_PORT': self.rpc_port
} }
# TODO: don't use os.environ # TODO: don't use os.environ
os.environ.update(conf) os.environ.update(conf)