include connection latency in server selection

This commit is contained in:
Jack Robison 2019-08-20 15:11:01 -04:00
parent a5af3866fd
commit c590954eb9
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -40,26 +40,30 @@ class ClientSession(BaseClientSession):
return self.transport.get_extra_info('peername') return self.transport.get_extra_info('peername')
async def send_timed_server_version_request(self, args=()): async def send_timed_server_version_request(self, args=()):
log.debug("send version request to %s:%i", *self.server)
start = perf_counter() start = perf_counter()
result = await asyncio.wait_for( result = await asyncio.wait_for(
super().send_request('server.version', args), timeout=self.timeout super().send_request('server.version', args), timeout=self.timeout
) )
current_response_time = perf_counter() - start current_response_time = perf_counter() - start
response_sum = self.response_time * self._response_samples + current_response_time response_sum = (self.response_time or 0) * self._response_samples + current_response_time
self.response_time = response_sum / (self._response_samples + 1) self.response_time = response_sum / (self._response_samples + 1)
self._response_samples += 1 self._response_samples += 1
return result return result
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
self.pending_amount += 1
try:
if method == 'server.version': if method == 'server.version':
return await self.send_timed_server_version_request(args) return await self.send_timed_server_version_request(args)
try:
return await asyncio.wait_for( return await asyncio.wait_for(
super().send_request(method, args), timeout=self.timeout super().send_request(method, args), timeout=self.timeout
) )
except RPCError as e: except RPCError as e:
log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args)
raise e raise e
finally:
self.pending_amount -= 1
async def ensure_session(self): async def ensure_session(self):
# Handles reconnecting and maintaining a session alive # Handles reconnecting and maintaining a session alive
@ -107,6 +111,7 @@ class ClientSession(BaseClientSession):
self.response_time = None self.response_time = None
self.connection_latency = None self.connection_latency = None
self._response_samples = 0 self._response_samples = 0
self.pending_amount = 0
self._on_disconnect_controller.add(True) self._on_disconnect_controller.add(True)
@ -234,7 +239,7 @@ class SessionPool:
if not self.available_sessions: if not self.available_sessions:
return None return None
return min( return min(
[(session.response_time * session.pending_amount, session) [((session.response_time + session.connection_latency) * (session.pending_amount + 1), session)
for session in self.available_sessions], for session in self.available_sessions],
key=itemgetter(0) key=itemgetter(0)
)[1] )[1]