Compare commits

...

2 commits

Author SHA1 Message Date
Jack Robison
5a39e53ed2
add outbound and incoming connection counts to connection status 2020-04-22 13:38:38 -04:00
Jack Robison
5e8b7e03af
add debug_in_flight api 2020-04-22 13:38:10 -04:00
2 changed files with 34 additions and 5 deletions

View file

@ -42,8 +42,7 @@ class ConnectionManager:
self.outgoing_connected.add(host_and_port) self.outgoing_connected.add(host_and_port)
def connection_received(self, host_and_port: str): def connection_received(self, host_and_port: str):
# self.incoming_connected.add(host_and_port) self.incoming_connected.add(host_and_port)
pass
def outgoing_connection_lost(self, host_and_port: str): def outgoing_connection_lost(self, host_and_port: str):
if self._running and host_and_port in self.outgoing_connected: if self._running and host_and_port in self.outgoing_connected:
@ -62,7 +61,9 @@ class ConnectionManager:
'total_sent': 0, 'total_sent': 0,
'total_received': 0, 'total_received': 0,
'max_incoming_mbs': 0.0, 'max_incoming_mbs': 0.0,
'max_outgoing_mbs': 0.0 'max_outgoing_mbs': 0.0,
'outbound_connections': 0,
'incoming_connections': 0
} }
while True: while True:
@ -87,6 +88,8 @@ class ConnectionManager:
self._max_outgoing_mbs = max(self._max_outgoing_mbs, self._status['total_outgoing_mbs']) self._max_outgoing_mbs = max(self._max_outgoing_mbs, self._status['total_outgoing_mbs'])
self._status['max_incoming_mbs'] = self._max_incoming_mbs self._status['max_incoming_mbs'] = self._max_incoming_mbs
self._status['max_outgoing_mbs'] = self._max_outgoing_mbs self._status['max_outgoing_mbs'] = self._max_outgoing_mbs
self._status['outbound_connections'] = len(self.outgoing_connected)
self._status['incoming_connections'] = len(self.incoming_connected)
def stop(self): def stop(self):
if self._task: if self._task:

View file

@ -331,6 +331,8 @@ class Daemon(metaclass=JSONRPCServerType):
self.need_connection_status_refresh = asyncio.Event() self.need_connection_status_refresh = asyncio.Event()
self._connection_status_task: Optional[asyncio.Task] = None self._connection_status_task: Optional[asyncio.Task] = None
self._received_requests_counter = 0
self._in_flight_requests: typing.Dict[int, typing.Tuple[str, float, typing.Tuple, typing.Dict]] = {}
@property @property
def dht_node(self) -> typing.Optional['Node']: def dht_node(self) -> typing.Optional['Node']:
@ -363,7 +365,7 @@ class Daemon(metaclass=JSONRPCServerType):
@classmethod @classmethod
def get_api_definitions(cls): def get_api_definitions(cls):
prefix = 'jsonrpc_' prefix = 'jsonrpc_'
not_grouped = ['routing_table_get', 'ffmpeg_find'] not_grouped = ['routing_table_get', 'ffmpeg_find', 'debug_in_flight']
api = { api = {
'groups': { 'groups': {
group_name[:-len('_DOC')].lower(): getattr(cls, group_name).strip() group_name[:-len('_DOC')].lower(): getattr(cls, group_name).strip()
@ -615,6 +617,8 @@ class Daemon(metaclass=JSONRPCServerType):
return await self.stream_manager.stream_partial_content(request, sd_hash) return await self.stream_manager.stream_partial_content(request, sd_hash)
async def _process_rpc_call(self, data): async def _process_rpc_call(self, data):
request_id = int(self._received_requests_counter)
self._received_requests_counter += 1
args = data.get('params', {}) args = data.get('params', {})
try: try:
@ -663,7 +667,7 @@ class Daemon(metaclass=JSONRPCServerType):
JSONRPCError.CODE_INVALID_PARAMS, JSONRPCError.CODE_INVALID_PARAMS,
params_error_message, params_error_message,
) )
self._in_flight_requests[request_id] = (function_name, time.perf_counter(), _args, _kwargs)
try: try:
result = method(self, *_args, **_kwargs) result = method(self, *_args, **_kwargs)
if asyncio.iscoroutine(result): if asyncio.iscoroutine(result):
@ -677,6 +681,8 @@ class Daemon(metaclass=JSONRPCServerType):
return JSONRPCError.create_command_exception( return JSONRPCError.create_command_exception(
command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc() command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc()
) )
finally:
del self._in_flight_requests[request_id]
def _verify_method_is_callable(self, function_path): def _verify_method_is_callable(self, function_path):
if function_path not in self.callable_methods: if function_path not in self.callable_methods:
@ -895,6 +901,7 @@ class Daemon(metaclass=JSONRPCServerType):
ffmpeg_status = await self._video_file_analyzer.status() ffmpeg_status = await self._video_file_analyzer.status()
running_components = self.component_manager.get_components_status() running_components = self.component_manager.get_components_status()
response = { response = {
'requests_in_flight': len(self._in_flight_requests),
'installation_id': self.installation_id, 'installation_id': self.installation_id,
'is_running': all(running_components.values()), 'is_running': all(running_components.values()),
'skipped_components': self.component_manager.skip_components, 'skipped_components': self.component_manager.skip_components,
@ -911,6 +918,25 @@ class Daemon(metaclass=JSONRPCServerType):
response[component.component_name] = status response[component.component_name] = status
return response return response
def jsonrpc_debug_in_flight(self):
"""
Debug currently in flight api requests
Usage:
debug_in_flight
Options:
None
Returns:
(dict) Dictionary of in flight requests
"""
now = time.perf_counter()
return {
f"{method}-{req_id}": now - started_ts
for req_id, (method, started_ts, _, _) in self._in_flight_requests.items()
}
def jsonrpc_version(self): # pylint: disable=no-self-use def jsonrpc_version(self): # pylint: disable=no-self-use
""" """
Get lbrynet API server version information Get lbrynet API server version information