Compare commits
2 commits
master
...
debug-in-f
Author | SHA1 | Date | |
---|---|---|---|
|
5a39e53ed2 | ||
|
5e8b7e03af |
2 changed files with 34 additions and 5 deletions
|
@ -42,8 +42,7 @@ class ConnectionManager:
|
||||||
self.outgoing_connected.add(host_and_port)
|
self.outgoing_connected.add(host_and_port)
|
||||||
|
|
||||||
def connection_received(self, host_and_port: str):
|
def connection_received(self, host_and_port: str):
|
||||||
# self.incoming_connected.add(host_and_port)
|
self.incoming_connected.add(host_and_port)
|
||||||
pass
|
|
||||||
|
|
||||||
def outgoing_connection_lost(self, host_and_port: str):
|
def outgoing_connection_lost(self, host_and_port: str):
|
||||||
if self._running and host_and_port in self.outgoing_connected:
|
if self._running and host_and_port in self.outgoing_connected:
|
||||||
|
@ -62,7 +61,9 @@ class ConnectionManager:
|
||||||
'total_sent': 0,
|
'total_sent': 0,
|
||||||
'total_received': 0,
|
'total_received': 0,
|
||||||
'max_incoming_mbs': 0.0,
|
'max_incoming_mbs': 0.0,
|
||||||
'max_outgoing_mbs': 0.0
|
'max_outgoing_mbs': 0.0,
|
||||||
|
'outbound_connections': 0,
|
||||||
|
'incoming_connections': 0
|
||||||
}
|
}
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
@ -87,6 +88,8 @@ class ConnectionManager:
|
||||||
self._max_outgoing_mbs = max(self._max_outgoing_mbs, self._status['total_outgoing_mbs'])
|
self._max_outgoing_mbs = max(self._max_outgoing_mbs, self._status['total_outgoing_mbs'])
|
||||||
self._status['max_incoming_mbs'] = self._max_incoming_mbs
|
self._status['max_incoming_mbs'] = self._max_incoming_mbs
|
||||||
self._status['max_outgoing_mbs'] = self._max_outgoing_mbs
|
self._status['max_outgoing_mbs'] = self._max_outgoing_mbs
|
||||||
|
self._status['outbound_connections'] = len(self.outgoing_connected)
|
||||||
|
self._status['incoming_connections'] = len(self.incoming_connected)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self._task:
|
if self._task:
|
||||||
|
|
|
@ -331,6 +331,8 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
self.need_connection_status_refresh = asyncio.Event()
|
self.need_connection_status_refresh = asyncio.Event()
|
||||||
self._connection_status_task: Optional[asyncio.Task] = None
|
self._connection_status_task: Optional[asyncio.Task] = None
|
||||||
|
self._received_requests_counter = 0
|
||||||
|
self._in_flight_requests: typing.Dict[int, typing.Tuple[str, float, typing.Tuple, typing.Dict]] = {}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def dht_node(self) -> typing.Optional['Node']:
|
def dht_node(self) -> typing.Optional['Node']:
|
||||||
|
@ -363,7 +365,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_api_definitions(cls):
|
def get_api_definitions(cls):
|
||||||
prefix = 'jsonrpc_'
|
prefix = 'jsonrpc_'
|
||||||
not_grouped = ['routing_table_get', 'ffmpeg_find']
|
not_grouped = ['routing_table_get', 'ffmpeg_find', 'debug_in_flight']
|
||||||
api = {
|
api = {
|
||||||
'groups': {
|
'groups': {
|
||||||
group_name[:-len('_DOC')].lower(): getattr(cls, group_name).strip()
|
group_name[:-len('_DOC')].lower(): getattr(cls, group_name).strip()
|
||||||
|
@ -615,6 +617,8 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
return await self.stream_manager.stream_partial_content(request, sd_hash)
|
return await self.stream_manager.stream_partial_content(request, sd_hash)
|
||||||
|
|
||||||
async def _process_rpc_call(self, data):
|
async def _process_rpc_call(self, data):
|
||||||
|
request_id = int(self._received_requests_counter)
|
||||||
|
self._received_requests_counter += 1
|
||||||
args = data.get('params', {})
|
args = data.get('params', {})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -663,7 +667,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
JSONRPCError.CODE_INVALID_PARAMS,
|
JSONRPCError.CODE_INVALID_PARAMS,
|
||||||
params_error_message,
|
params_error_message,
|
||||||
)
|
)
|
||||||
|
self._in_flight_requests[request_id] = (function_name, time.perf_counter(), _args, _kwargs)
|
||||||
try:
|
try:
|
||||||
result = method(self, *_args, **_kwargs)
|
result = method(self, *_args, **_kwargs)
|
||||||
if asyncio.iscoroutine(result):
|
if asyncio.iscoroutine(result):
|
||||||
|
@ -677,6 +681,8 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
return JSONRPCError.create_command_exception(
|
return JSONRPCError.create_command_exception(
|
||||||
command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc()
|
command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc()
|
||||||
)
|
)
|
||||||
|
finally:
|
||||||
|
del self._in_flight_requests[request_id]
|
||||||
|
|
||||||
def _verify_method_is_callable(self, function_path):
|
def _verify_method_is_callable(self, function_path):
|
||||||
if function_path not in self.callable_methods:
|
if function_path not in self.callable_methods:
|
||||||
|
@ -895,6 +901,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
ffmpeg_status = await self._video_file_analyzer.status()
|
ffmpeg_status = await self._video_file_analyzer.status()
|
||||||
running_components = self.component_manager.get_components_status()
|
running_components = self.component_manager.get_components_status()
|
||||||
response = {
|
response = {
|
||||||
|
'requests_in_flight': len(self._in_flight_requests),
|
||||||
'installation_id': self.installation_id,
|
'installation_id': self.installation_id,
|
||||||
'is_running': all(running_components.values()),
|
'is_running': all(running_components.values()),
|
||||||
'skipped_components': self.component_manager.skip_components,
|
'skipped_components': self.component_manager.skip_components,
|
||||||
|
@ -911,6 +918,25 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
response[component.component_name] = status
|
response[component.component_name] = status
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def jsonrpc_debug_in_flight(self):
|
||||||
|
"""
|
||||||
|
Debug currently in flight api requests
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
debug_in_flight
|
||||||
|
|
||||||
|
Options:
|
||||||
|
None
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(dict) Dictionary of in flight requests
|
||||||
|
"""
|
||||||
|
now = time.perf_counter()
|
||||||
|
return {
|
||||||
|
f"{method}-{req_id}": now - started_ts
|
||||||
|
for req_id, (method, started_ts, _, _) in self._in_flight_requests.items()
|
||||||
|
}
|
||||||
|
|
||||||
def jsonrpc_version(self): # pylint: disable=no-self-use
|
def jsonrpc_version(self): # pylint: disable=no-self-use
|
||||||
"""
|
"""
|
||||||
Get lbrynet API server version information
|
Get lbrynet API server version information
|
||||||
|
|
Loading…
Add table
Reference in a new issue