added metrics module
This commit is contained in:
parent
71551058ce
commit
49b0f59790
2 changed files with 192 additions and 0 deletions
130
lbry/lbry/wallet/server/metrics.py
Normal file
130
lbry/lbry/wallet/server/metrics.py
Normal file
|
@ -0,0 +1,130 @@
|
||||||
|
import time
|
||||||
|
import math
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_elapsed(start) -> int:
|
||||||
|
return int((time.perf_counter() - start) * 1000)
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_percentiles(data) -> Tuple[int, int, int, int, int, int, int]:
|
||||||
|
if not data:
|
||||||
|
return 0, 0, 0, 0, 0, 0, 0
|
||||||
|
data.sort()
|
||||||
|
size = len(data)
|
||||||
|
return (
|
||||||
|
data[0],
|
||||||
|
data[math.ceil(size * .05) - 1],
|
||||||
|
data[math.ceil(size * .25) - 1],
|
||||||
|
data[math.ceil(size * .50) - 1],
|
||||||
|
data[math.ceil(size * .75) - 1],
|
||||||
|
data[math.ceil(size * .95) - 1],
|
||||||
|
data[-1]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def avg(data) -> int:
|
||||||
|
return int(sum(data) / len(data)) if data else 0
|
||||||
|
|
||||||
|
|
||||||
|
def remove_select_list(sql) -> str:
|
||||||
|
return sql[sql.index('FROM'):]
|
||||||
|
|
||||||
|
|
||||||
|
class APICallMetrics:
|
||||||
|
|
||||||
|
def __init__(self, name):
|
||||||
|
self.name = name
|
||||||
|
# total counts
|
||||||
|
self.cache_hits = 0
|
||||||
|
self.started = 0
|
||||||
|
self.errored = 0
|
||||||
|
self.errored_queries = set()
|
||||||
|
self.interrupted = 0
|
||||||
|
self.interrupted_queries = set()
|
||||||
|
# timings
|
||||||
|
self.command_total_times = []
|
||||||
|
self.command_query_times = []
|
||||||
|
self.command_execution_times = []
|
||||||
|
self.command_wait_times = []
|
||||||
|
self.individual_query_times = []
|
||||||
|
|
||||||
|
def to_json_and_reset(self):
|
||||||
|
return {
|
||||||
|
# total counts
|
||||||
|
"cache_hits_count": self.cache_hits,
|
||||||
|
"started_count": self.started,
|
||||||
|
"finished_count": len(self.command_total_times),
|
||||||
|
"errored_count": self.errored,
|
||||||
|
"errored_queries": list(self.errored_queries),
|
||||||
|
"interrupted_count": self.interrupted,
|
||||||
|
"interrupted_queries": list(self.interrupted_queries),
|
||||||
|
"individual_queries_count": len(self.individual_query_times),
|
||||||
|
# timings and percentiles
|
||||||
|
"total_avg": avg(self.command_total_times),
|
||||||
|
"total_percentiles": calculate_percentiles(self.command_total_times),
|
||||||
|
"query_avg": avg(self.command_query_times),
|
||||||
|
"query_percentiles": calculate_percentiles(self.command_query_times),
|
||||||
|
"execution_avg": avg(self.command_execution_times),
|
||||||
|
"execution_percentiles": calculate_percentiles(self.command_execution_times),
|
||||||
|
"wait_avg": avg(self.command_wait_times),
|
||||||
|
"wait_percentiles": calculate_percentiles(self.command_wait_times),
|
||||||
|
"individual_query_avg": avg(self.individual_query_times),
|
||||||
|
"individual_query_percentiles": calculate_percentiles(self.individual_query_times),
|
||||||
|
}
|
||||||
|
|
||||||
|
def cache_hit(self):
|
||||||
|
self.cache_hits += 1
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.started += 1
|
||||||
|
|
||||||
|
def finish(self, start, metrics):
|
||||||
|
self.command_total_times.append(calculate_elapsed(start))
|
||||||
|
if metrics and 'execute_query' in metrics:
|
||||||
|
query_times = [f['total'] for f in metrics['execute_query']]
|
||||||
|
self.individual_query_times.extend(query_times)
|
||||||
|
command_query_time = sum(query_times)
|
||||||
|
self.command_query_times.append(command_query_time)
|
||||||
|
self.command_execution_times.append(
|
||||||
|
metrics[self.name][0]['total'] - command_query_time
|
||||||
|
)
|
||||||
|
self.command_wait_times.append(
|
||||||
|
self.command_total_times[-1] - metrics[self.name][0]['total']
|
||||||
|
)
|
||||||
|
|
||||||
|
def _add_queries(self, metrics, query_set):
|
||||||
|
if metrics and 'execute_query' in metrics:
|
||||||
|
for execute_query in metrics['execute_query']:
|
||||||
|
if 'sql' in execute_query:
|
||||||
|
query_set.add(remove_select_list(execute_query['sql']))
|
||||||
|
|
||||||
|
def interrupt(self, start, metrics):
|
||||||
|
self.finish(start, metrics)
|
||||||
|
self._add_queries(metrics, self.interrupted_queries)
|
||||||
|
|
||||||
|
def error(self, start, metrics=None):
|
||||||
|
self.errored += 1
|
||||||
|
if metrics:
|
||||||
|
self.finish(start, metrics)
|
||||||
|
self._add_queries(metrics, self.errored_queries)
|
||||||
|
|
||||||
|
|
||||||
|
class ServerLoadData:
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._apis = {}
|
||||||
|
|
||||||
|
def for_api(self, name) -> APICallMetrics:
|
||||||
|
if name not in self._apis:
|
||||||
|
self._apis[name] = APICallMetrics(name)
|
||||||
|
return self._apis[name]
|
||||||
|
|
||||||
|
def to_json_and_reset(self, server):
|
||||||
|
try:
|
||||||
|
return {
|
||||||
|
'api': {name: api.to_json_and_reset() for name, api in self._apis.items()},
|
||||||
|
'server': server
|
||||||
|
}
|
||||||
|
finally:
|
||||||
|
self._apis = {}
|
62
lbry/tests/unit/wallet/server/test_metrics.py
Normal file
62
lbry/tests/unit/wallet/server/test_metrics.py
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from lbry.wallet.server.metrics import ServerLoadData, calculate_percentiles
|
||||||
|
|
||||||
|
|
||||||
|
class TestPercentileCalculation(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_calculate_percentiles(self):
|
||||||
|
self.assertEqual(calculate_percentiles([]), [0, 0, 0, 0, 0, 0, 0])
|
||||||
|
self.assertEqual(calculate_percentiles([1]), [1, 1, 1, 1, 1, 1, 1])
|
||||||
|
self.assertEqual(calculate_percentiles([1, 2]), [1, 1, 1, 1, 2, 2, 2])
|
||||||
|
self.assertEqual(calculate_percentiles([1, 2, 3]), [1, 1, 1, 2, 3, 3, 3])
|
||||||
|
self.assertEqual(calculate_percentiles([1, 2, 3, 4]), [1, 1, 1, 2, 3, 4, 4])
|
||||||
|
self.assertEqual(calculate_percentiles([1, 2, 3, 4, 5, 6]), [1, 1, 2, 3, 5, 6, 6])
|
||||||
|
self.assertEqual(calculate_percentiles(list(range(1, 101))), [1, 5, 25, 50, 75, 95, 100])
|
||||||
|
|
||||||
|
|
||||||
|
class TestCollectingMetrics(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_happy_path(self):
|
||||||
|
self.maxDiff = None
|
||||||
|
load = ServerLoadData()
|
||||||
|
search = load.for_api('search')
|
||||||
|
self.assertEqual(search.name, 'search')
|
||||||
|
search.start()
|
||||||
|
search.cache_hit()
|
||||||
|
search.cache_hit()
|
||||||
|
metrics = {
|
||||||
|
'search': [{'total': 40}],
|
||||||
|
'execute_query': [
|
||||||
|
{'total': 20},
|
||||||
|
{'total': 10}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
for x in range(5):
|
||||||
|
search.finish(time.perf_counter() - 0.055 + 0.001*x, metrics)
|
||||||
|
metrics['execute_query'][0]['total'] = 10
|
||||||
|
metrics['execute_query'][0]['sql'] = "select lots, of, stuff FROM claim where something=1"
|
||||||
|
search.interrupt(time.perf_counter() - 0.050, metrics)
|
||||||
|
search.error(time.perf_counter() - 0.050, metrics)
|
||||||
|
search.error(time.perf_counter() - 0.052)
|
||||||
|
self.assertEqual(load.to_json_and_reset({}), {'status': {}, 'api': {'search': {
|
||||||
|
'cache_hits_count': 2,
|
||||||
|
'errored_count': 2,
|
||||||
|
'errored_queries': ['FROM claim where something=1'],
|
||||||
|
'execution_avg': 12,
|
||||||
|
'execution_percentiles': (10, 10, 10, 10, 20, 20, 20),
|
||||||
|
'finished_count': 7,
|
||||||
|
'individual_queries_count': 14,
|
||||||
|
'individual_query_avg': 13,
|
||||||
|
'individual_query_percentiles': (10, 10, 10, 10, 20, 20, 20),
|
||||||
|
'interrupted_count': 0,
|
||||||
|
'interrupted_queries': ['FROM claim where something=1'],
|
||||||
|
'query_avg': 27,
|
||||||
|
'query_percentiles': (20, 20, 20, 30, 30, 30, 30),
|
||||||
|
'started_count': 1,
|
||||||
|
'total_avg': 52,
|
||||||
|
'total_percentiles': (50, 50, 50, 52, 54, 55, 55),
|
||||||
|
'wait_avg': 12,
|
||||||
|
'wait_percentiles': (10, 10, 10, 12, 14, 15, 15)
|
||||||
|
}}})
|
||||||
|
self.assertEqual(load.to_json_and_reset({}), {'api': {}})
|
Loading…
Add table
Reference in a new issue