track response time for server.version only

This commit is contained in:
Jack Robison 2019-08-20 14:52:49 -04:00
parent 0e48772c84
commit 9045737504
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -1,7 +1,7 @@
import logging import logging
import asyncio import asyncio
from operator import itemgetter from operator import itemgetter
from typing import Dict, Optional from typing import Dict, Optional, Tuple
from time import time, perf_counter from time import time, perf_counter
from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError
@ -13,7 +13,6 @@ log = logging.getLogger(__name__)
class ClientSession(BaseClientSession): class ClientSession(BaseClientSession):
def __init__(self, *args, network, server, timeout=30, on_connect_callback=None, **kwargs): def __init__(self, *args, network, server, timeout=30, on_connect_callback=None, **kwargs):
self.network = network self.network = network
self.server = server self.server = server
@ -35,30 +34,32 @@ class ClientSession(BaseClientSession):
def available(self): def available(self):
return not self.is_closing() and self._can_send.is_set() and self.response_time is not None return not self.is_closing() and self._can_send.is_set() and self.response_time is not None
@property
def server_address_and_port(self) -> Optional[Tuple[str, int]]:
if self.transport:
return self.transport.get_extra_info('peername')
async def send_timed_server_version_request(self, args=()):
start = perf_counter()
result = await asyncio.wait_for(
super().send_request('server.version', args), timeout=self.timeout
)
current_response_time = perf_counter() - start
response_sum = self.response_time * self._response_samples + current_response_time
self.response_time = response_sum / (self._response_samples + 1)
self._response_samples += 1
return result
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
if method == 'server.version':
return await self.send_timed_server_version_request(args)
try: try:
self.pending_amount += 1 return await asyncio.wait_for(
start = perf_counter()
result = await asyncio.wait_for(
super().send_request(method, args), timeout=self.timeout super().send_request(method, args), timeout=self.timeout
) )
current_response_time = perf_counter() - start
if self.response_time:
self.response_time = (self.response_time * self._response_samples
+ current_response_time) / (self._response_samples + 1)
else:
self.response_time = current_response_time
self._response_samples += 1
return result
except RPCError as e: except RPCError as e:
log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args)
raise e raise e
except TimeoutError:
self.response_time = None
self._response_samples = 0
raise
finally:
self.pending_amount -= 1
async def ensure_session(self): async def ensure_session(self):
# Handles reconnecting and maintaining a session alive # Handles reconnecting and maintaining a session alive
@ -70,8 +71,8 @@ class ClientSession(BaseClientSession):
await self.create_connection(self.timeout) await self.create_connection(self.timeout)
await self.ensure_server_version() await self.ensure_server_version()
self._on_connect_cb() self._on_connect_cb()
if (time() - self.last_send) > self.max_seconds_idle or self.response_time is None: if (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None:
await self.send_request('server.banner') await self.ensure_server_version()
retry_delay = default_delay retry_delay = default_delay
except (asyncio.TimeoutError, OSError): except (asyncio.TimeoutError, OSError):
await self.close() await self.close()