forked from LBRYCommunity/lbry-sdk
rolling averages on response time
This commit is contained in:
parent
c826c7da0d
commit
af797296ed
1 changed files with 16 additions and 6 deletions
|
@ -25,6 +25,8 @@ class ClientSession(BaseClientSession):
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.max_seconds_idle = timeout * 2
|
self.max_seconds_idle = timeout * 2
|
||||||
self.response_time: Optional[float] = None
|
self.response_time: Optional[float] = None
|
||||||
|
self._response_samples = 0
|
||||||
|
self.pending_amount = 0
|
||||||
self._on_connect_cb = on_connect_callback or (lambda: None)
|
self._on_connect_cb = on_connect_callback or (lambda: None)
|
||||||
self.trigger_urgent_reconnect = asyncio.Event()
|
self.trigger_urgent_reconnect = asyncio.Event()
|
||||||
|
|
||||||
|
@ -32,24 +34,30 @@ class ClientSession(BaseClientSession):
|
||||||
def available(self):
|
def available(self):
|
||||||
return not self.is_closing() and self._can_send.is_set() and self.response_time is not None
|
return not self.is_closing() and self._can_send.is_set() and self.response_time is not None
|
||||||
|
|
||||||
@property
|
|
||||||
def pending_size(self):
|
|
||||||
return len(self.connection.pending_requests())
|
|
||||||
|
|
||||||
async def send_request(self, method, args=()):
|
async def send_request(self, method, args=()):
|
||||||
try:
|
try:
|
||||||
|
self.pending_amount += 1
|
||||||
start = perf_counter()
|
start = perf_counter()
|
||||||
result = await asyncio.wait_for(
|
result = await asyncio.wait_for(
|
||||||
super().send_request(method, args), timeout=self.timeout
|
super().send_request(method, args), timeout=self.timeout
|
||||||
)
|
)
|
||||||
self.response_time = perf_counter() - start
|
current_response_time = perf_counter() - start
|
||||||
|
if self.response_time:
|
||||||
|
self.response_time = (self.response_time * self._response_samples
|
||||||
|
+ current_response_time) / (self._response_samples + 1)
|
||||||
|
else:
|
||||||
|
self.response_time = current_response_time
|
||||||
|
self._response_samples += 1
|
||||||
return result
|
return result
|
||||||
except RPCError as e:
|
except RPCError as e:
|
||||||
log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args)
|
log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args)
|
||||||
raise e
|
raise e
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
self.response_time = None
|
self.response_time = None
|
||||||
|
self._response_samples = 0
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
self.pending_amount -= 1
|
||||||
|
|
||||||
async def ensure_session(self):
|
async def ensure_session(self):
|
||||||
# Handles reconnecting and maintaining a session alive
|
# Handles reconnecting and maintaining a session alive
|
||||||
|
@ -93,6 +101,7 @@ class ClientSession(BaseClientSession):
|
||||||
log.debug("Connection lost: %s:%d", *self.server)
|
log.debug("Connection lost: %s:%d", *self.server)
|
||||||
super().connection_lost(exc)
|
super().connection_lost(exc)
|
||||||
self.response_time = None
|
self.response_time = None
|
||||||
|
self._response_samples = 0
|
||||||
self._on_disconnect_controller.add(True)
|
self._on_disconnect_controller.add(True)
|
||||||
|
|
||||||
|
|
||||||
|
@ -220,7 +229,8 @@ class SessionPool:
|
||||||
if not self.available_sessions:
|
if not self.available_sessions:
|
||||||
return None
|
return None
|
||||||
return min(
|
return min(
|
||||||
[(session.response_time * session.pending_size, session) for session in self.available_sessions],
|
[(session.response_time * session.pending_amount, session)
|
||||||
|
for session in self.available_sessions],
|
||||||
key=itemgetter(0)
|
key=itemgetter(0)
|
||||||
)[1]
|
)[1]
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue