metrics reporting fix for wallet servers
This commit is contained in:
parent
ff4b2f2dec
commit
8a284d6c49
2 changed files with 20 additions and 8 deletions
|
@ -52,7 +52,7 @@ class APICallMetrics:
|
||||||
self.errored_queries = set()
|
self.errored_queries = set()
|
||||||
self.interrupted_queries = set()
|
self.interrupted_queries = set()
|
||||||
|
|
||||||
def to_json_and_reset(self):
|
def to_json(self):
|
||||||
return {
|
return {
|
||||||
# total requests received
|
# total requests received
|
||||||
"receive_count": self.receive_count,
|
"receive_count": self.receive_count,
|
||||||
|
@ -128,7 +128,7 @@ class ServerLoadData:
|
||||||
def to_json_and_reset(self, status):
|
def to_json_and_reset(self, status):
|
||||||
try:
|
try:
|
||||||
return {
|
return {
|
||||||
'api': {name: api.to_json_and_reset() for name, api in self._apis.items()},
|
'api': {name: api.to_json() for name, api in self._apis.items()},
|
||||||
'status': status
|
'status': status
|
||||||
}
|
}
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -112,33 +112,44 @@ class LBRYElectrumX(ElectrumX):
|
||||||
}
|
}
|
||||||
self.request_handlers.update(handlers)
|
self.request_handlers.update(handlers)
|
||||||
|
|
||||||
async def run_in_executor(self, metrics: APICallMetrics, func, kwargs):
|
def get_metrics_or_placeholder_for_api(self, query_name):
|
||||||
|
""" Do not hold on to a reference to the metrics
|
||||||
|
returned by this method past an `await` or
|
||||||
|
you may be working with a stale metrics object.
|
||||||
|
"""
|
||||||
|
if self.env.track_metrics:
|
||||||
|
return self.session_mgr.metrics.for_api(query_name)
|
||||||
|
else:
|
||||||
|
return APICallMetrics(query_name)
|
||||||
|
|
||||||
|
async def run_in_executor(self, query_name, func, kwargs):
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
result = await asyncio.get_running_loop().run_in_executor(
|
result = await asyncio.get_running_loop().run_in_executor(
|
||||||
self.session_mgr.query_executor, func, kwargs
|
self.session_mgr.query_executor, func, kwargs
|
||||||
)
|
)
|
||||||
except reader.SQLiteInterruptedError as error:
|
except reader.SQLiteInterruptedError as error:
|
||||||
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
metrics.query_interrupt(start, error.metrics)
|
metrics.query_interrupt(start, error.metrics)
|
||||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
|
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
|
||||||
except reader.SQLiteOperationalError as error:
|
except reader.SQLiteOperationalError as error:
|
||||||
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
metrics.query_error(start, error.metrics)
|
metrics.query_error(start, error.metrics)
|
||||||
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
|
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
|
||||||
except:
|
except:
|
||||||
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
metrics.query_error(start, {})
|
metrics.query_error(start, {})
|
||||||
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
|
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
|
||||||
|
|
||||||
if self.env.track_metrics:
|
if self.env.track_metrics:
|
||||||
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
(result, metrics_data) = result
|
(result, metrics_data) = result
|
||||||
metrics.query_response(start, metrics_data)
|
metrics.query_response(start, metrics_data)
|
||||||
|
|
||||||
return base64.b64encode(result).decode()
|
return base64.b64encode(result).decode()
|
||||||
|
|
||||||
async def run_and_cache_query(self, query_name, function, kwargs):
|
async def run_and_cache_query(self, query_name, function, kwargs):
|
||||||
if self.env.track_metrics:
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
metrics = self.session_mgr.metrics.for_api(query_name)
|
|
||||||
else:
|
|
||||||
metrics = APICallMetrics(query_name)
|
|
||||||
metrics.start()
|
metrics.start()
|
||||||
cache = self.session_mgr.search_cache[query_name]
|
cache = self.session_mgr.search_cache[query_name]
|
||||||
cache_key = str(kwargs)
|
cache_key = str(kwargs)
|
||||||
|
@ -151,9 +162,10 @@ class LBRYElectrumX(ElectrumX):
|
||||||
async with cache_item.lock:
|
async with cache_item.lock:
|
||||||
if cache_item.result is None:
|
if cache_item.result is None:
|
||||||
cache_item.result = await self.run_in_executor(
|
cache_item.result = await self.run_in_executor(
|
||||||
metrics, function, kwargs
|
query_name, function, kwargs
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
metrics.cache_response()
|
metrics.cache_response()
|
||||||
return cache_item.result
|
return cache_item.result
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue