pick fastest server on client start

This commit is contained in:
Victor Shyba 2019-06-03 05:21:57 -03:00 committed by Lex Berezhny
parent d23920cc61
commit 8911af58cf
2 changed files with 37 additions and 5 deletions

View file

@ -1,6 +1,7 @@
import logging import logging
import asyncio import asyncio
from torba.rpc import RPCSession
from torba.testcase import IntegrationTestCase from torba.testcase import IntegrationTestCase
@ -39,3 +40,16 @@ class ReconnectTests(IntegrationTestCase):
self.ledger.config['default_servers'] = [('10.0.0.1', 12)] + list(conf['default_servers']) self.ledger.config['default_servers'] = [('10.0.0.1', 12)] + list(conf['default_servers'])
await self.ledger.start() await self.ledger.start()
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected)
async def test_pick_fastest(self):
# local server that is listening but wont reply
proto = RPCSession()
proto.handle_request = lambda _: asyncio.sleep(10)
server = await self.loop.create_server(lambda: proto, host='127.0.0.1', port=1337)
await self.ledger.stop()
conf = self.ledger.config
self.ledger.config['default_servers'] = [('127.0.0.1', 1337)] + list(conf['default_servers'])
self.ledger.config['connect_timeout'] = 30
await asyncio.wait_for(self.ledger.start(), timeout=1)
self.assertTrue(self.ledger.network.is_connected)
self.assertEqual(self.ledger.network.client.server, conf['default_servers'][-1])

View file

@ -1,7 +1,6 @@
import logging import logging
import asyncio import asyncio
from asyncio import CancelledError from asyncio import CancelledError
from itertools import cycle
from time import time from time import time
from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError
@ -75,13 +74,32 @@ class BaseNetwork:
'blockchain.address.subscribe': self._on_status_controller, 'blockchain.address.subscribe': self._on_status_controller,
} }
async def pick_fastest_server(self, timeout):
async def __probe(server):
client = ClientSession(network=self, server=server)
try:
await client.create_connection(timeout)
await client.send_request('server.banner')
return client
except (asyncio.TimeoutError, asyncio.CancelledError) as error:
client.connection_lost(error)
raise error
futures = []
for server in self.config['default_servers']:
futures.append(__probe(server))
done, pending = await asyncio.wait(futures, return_when='FIRST_COMPLETED')
for task in pending:
task.cancel()
for client in done:
return await client
async def start(self): async def start(self):
self.running = True self.running = True
delay = 0.0 delay = 0.0
connect_timeout = self.config.get('connect_timeout', 6) connect_timeout = self.config.get('connect_timeout', 6)
for server in cycle(self.config['default_servers']): while True:
self.client = ClientSession(network=self, server=server) self.client = await self.pick_fastest_server(connect_timeout)
connection_string = '{}:{}'.format(*server) connection_string = '{}:{}'.format(*self.client.server)
try: try:
await self.client.create_connection(connect_timeout) await self.client.create_connection(connect_timeout)
await self.ensure_server_version() await self.ensure_server_version()