more metrics cleanup

This commit is contained in:
Lex Berezhny 2019-07-18 19:15:01 -04:00
parent d7ec3c182d
commit 08da763327
4 changed files with 115 additions and 117 deletions

View file

@ -169,7 +169,7 @@ def execute_query(sql, values) -> List:
except sqlite3.OperationalError as err: except sqlite3.OperationalError as err:
plain_sql = interpolate(sql, values) plain_sql = interpolate(sql, values)
if context.is_tracking_metrics: if context.is_tracking_metrics:
context.metric['execute_query'][-1]['sql'] = plain_sql context.metrics['execute_query'][-1]['sql'] = plain_sql
if str(err) == "interrupted": if str(err) == "interrupted":
context.log.warning("interrupted slow sqlite query:\n%s", plain_sql) context.log.warning("interrupted slow sqlite query:\n%s", plain_sql)
raise SQLiteInterruptedError(context.metrics) raise SQLiteInterruptedError(context.metrics)

View file

@ -7,12 +7,13 @@ def calculate_elapsed(start) -> int:
return int((time.perf_counter() - start) * 1000) return int((time.perf_counter() - start) * 1000)
def calculate_percentiles(data) -> Tuple[int, int, int, int, int, int, int]: def calculate_avg_percentiles(data) -> Tuple[int, int, int, int, int, int, int, int]:
if not data: if not data:
return 0, 0, 0, 0, 0, 0, 0 return 0, 0, 0, 0, 0, 0, 0, 0
data.sort() data.sort()
size = len(data) size = len(data)
return ( return (
int(sum(data) / size),
data[0], data[0],
data[math.ceil(size * .05) - 1], data[math.ceil(size * .05) - 1],
data[math.ceil(size * .25) - 1], data[math.ceil(size * .25) - 1],
@ -23,10 +24,6 @@ def calculate_percentiles(data) -> Tuple[int, int, int, int, int, int, int]:
) )
def avg(data) -> int:
return int(sum(data) / len(data)) if data else 0
def remove_select_list(sql) -> str: def remove_select_list(sql) -> str:
return sql[sql.index('FROM'):] return sql[sql.index('FROM'):]
@ -35,79 +32,87 @@ class APICallMetrics:
def __init__(self, name): def __init__(self, name):
self.name = name self.name = name
# total counts
self.cache_hits = 0 # total requests received
self.started = 0 self.receive_count = 0
self.errored = 0 self.cache_response_count = 0
# millisecond timings for query based responses
self.query_response_times = []
self.query_intrp_times = []
self.query_error_times = []
self.query_python_times = []
self.query_wait_times = []
self.query_sql_times = [] # aggregate total of multiple SQL calls made per request
self.individual_sql_times = [] # every SQL query run on server
# actual queries
self.errored_queries = set() self.errored_queries = set()
self.interrupted = 0
self.interrupted_queries = set() self.interrupted_queries = set()
# timings
self.command_total_times = []
self.command_query_times = []
self.command_execution_times = []
self.command_wait_times = []
self.individual_query_times = []
def to_json_and_reset(self): def to_json_and_reset(self):
return { return {
# total counts # total requests received
"cache_hits_count": self.cache_hits, "receive_count": self.receive_count,
"started_count": self.started, # sum of these is total responses made
"finished_count": len(self.command_total_times), "cache_response_count": self.cache_response_count,
"errored_count": self.errored, "query_response_count": len(self.query_response_times),
"intrp_response_count": len(self.query_intrp_times),
"error_response_count": len(self.query_error_times),
# millisecond timings for non-cache responses
"response": calculate_avg_percentiles(self.query_response_times),
"interrupt": calculate_avg_percentiles(self.query_intrp_times),
"error": calculate_avg_percentiles(self.query_error_times),
# response, interrupt and error each also report the python, wait and sql stats:
"python": calculate_avg_percentiles(self.query_python_times),
"wait": calculate_avg_percentiles(self.query_wait_times),
"sql": calculate_avg_percentiles(self.query_sql_times),
# extended timings for individual sql executions
"individual_sql": calculate_avg_percentiles(self.individual_sql_times),
"individual_sql_count": len(self.individual_sql_times),
# actual queries
"errored_queries": list(self.errored_queries), "errored_queries": list(self.errored_queries),
"interrupted_count": self.interrupted,
"interrupted_queries": list(self.interrupted_queries), "interrupted_queries": list(self.interrupted_queries),
"individual_queries_count": len(self.individual_query_times),
# timings and percentiles
"total_avg": avg(self.command_total_times),
"total_percentiles": calculate_percentiles(self.command_total_times),
"query_avg": avg(self.command_query_times),
"query_percentiles": calculate_percentiles(self.command_query_times),
"execution_avg": avg(self.command_execution_times),
"execution_percentiles": calculate_percentiles(self.command_execution_times),
"wait_avg": avg(self.command_wait_times),
"wait_percentiles": calculate_percentiles(self.command_wait_times),
"individual_query_avg": avg(self.individual_query_times),
"individual_query_percentiles": calculate_percentiles(self.individual_query_times),
} }
def cache_hit(self):
self.cache_hits += 1
def start(self): def start(self):
self.started += 1 self.receive_count += 1
def finish(self, start, metrics): def cache_response(self):
self.command_total_times.append(calculate_elapsed(start)) self.cache_response_count += 1
def _add_query_timings(self, request_total_time, metrics):
if metrics and 'execute_query' in metrics: if metrics and 'execute_query' in metrics:
query_times = [f['total'] for f in metrics['execute_query']] sub_process_total = metrics[self.name][0]['total']
self.individual_query_times.extend(query_times) individual_query_times = [f['total'] for f in metrics['execute_query']]
command_query_time = sum(query_times) aggregated_query_time = sum(individual_query_times)
self.command_query_times.append(command_query_time) self.individual_sql_times.extend(individual_query_times)
self.command_execution_times.append( self.query_sql_times.append(aggregated_query_time)
metrics[self.name][0]['total'] - command_query_time self.query_python_times.append(sub_process_total - aggregated_query_time)
) self.query_wait_times.append(request_total_time - sub_process_total)
self.command_wait_times.append(
self.command_total_times[-1] - metrics[self.name][0]['total']
)
def _add_queries(self, metrics, query_set): @staticmethod
def _add_queries(query_set, metrics):
if metrics and 'execute_query' in metrics: if metrics and 'execute_query' in metrics:
for execute_query in metrics['execute_query']: for execute_query in metrics['execute_query']:
if 'sql' in execute_query: if 'sql' in execute_query:
query_set.add(remove_select_list(execute_query['sql'])) query_set.add(remove_select_list(execute_query['sql']))
def interrupt(self, start, metrics): def query_response(self, start, metrics):
self.finish(start, metrics) self.query_response_times.append(calculate_elapsed(start))
self._add_queries(metrics, self.interrupted_queries) self._add_query_timings(self.query_response_times[-1], metrics)
def error(self, start, metrics=None): def query_interrupt(self, start, metrics):
self.errored += 1 self.query_intrp_times.append(calculate_elapsed(start))
if metrics: self._add_queries(self.interrupted_queries, metrics)
self.finish(start, metrics) self._add_query_timings(self.query_intrp_times[-1], metrics)
self._add_queries(metrics, self.errored_queries)
def query_error(self, start, metrics):
self.query_error_times.append(calculate_elapsed(start))
self._add_queries(self.errored_queries, metrics)
self._add_query_timings(self.query_error_times[-1], metrics)
class ServerLoadData: class ServerLoadData:

View file

@ -15,7 +15,7 @@ from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYDB from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db import reader from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
class ResultCacheItem: class ResultCacheItem:
@ -109,55 +109,50 @@ class LBRYElectrumX(ElectrumX):
} }
self.request_handlers.update(handlers) self.request_handlers.update(handlers)
async def run_in_executor(self, name, func, kwargs): async def run_in_executor(self, metrics: APICallMetrics, func, kwargs):
result = start = api = None
if self.env.track_metrics:
api = self.session_mgr.metrics.for_api(name)
api.start()
start = time.perf_counter() start = time.perf_counter()
try: try:
result = await asyncio.get_running_loop().run_in_executor( result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs self.session_mgr.query_executor, func, kwargs
) )
except reader.SQLiteInterruptedError as error: except reader.SQLiteInterruptedError as error:
if self.env.track_metrics: metrics.query_interrupt(start, error.metrics)
api.interrupt(start, error.metrics)
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
except reader.SQLiteOperationalError as error: except reader.SQLiteOperationalError as error:
if self.env.track_metrics: metrics.query_error(start, error.metrics)
api.error(start, error.metrics)
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
except: except:
if self.env.track_metrics: metrics.query_error(start, {})
api.error(start)
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
if self.env.track_metrics: if self.env.track_metrics:
(result, metrics) = result (result, metrics_data) = result
api.finish(start, metrics) metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode() return base64.b64encode(result).decode()
async def run_and_cache_query(self, query_name, function, kwargs): async def run_and_cache_query(self, query_name, function, kwargs):
if self.env.track_metrics:
metrics = self.session_mgr.metrics.for_api(query_name)
else:
metrics = APICallMetrics(query_name)
metrics.start()
cache = self.session_mgr.search_cache[query_name] cache = self.session_mgr.search_cache[query_name]
cache_key = str(kwargs) cache_key = str(kwargs)
cache_item = cache.get(cache_key) cache_item = cache.get(cache_key)
if cache_item is None: if cache_item is None:
cache_item = cache[cache_key] = ResultCacheItem() cache_item = cache[cache_key] = ResultCacheItem()
elif cache_item.result is not None: elif cache_item.result is not None:
self.session_mgr.metrics.for_api(query_name).cache_hit() metrics.cache_response()
return cache_item.result return cache_item.result
async with cache_item.lock: async with cache_item.lock:
result = cache_item.result if cache_item.result is None:
if result is None: cache_item.result = await self.run_in_executor(
result = cache_item.result = await self.run_in_executor( metrics, function, kwargs
query_name, function, kwargs
) )
else: else:
self.session_mgr.metrics.for_api(query_name).cache_hit() metrics.cache_response()
return result return cache_item.result
async def claimtrie_search(self, **kwargs): async def claimtrie_search(self, **kwargs):
if kwargs: if kwargs:

View file

@ -1,18 +1,19 @@
import time import time
import unittest import unittest
from lbry.wallet.server.metrics import ServerLoadData, calculate_percentiles from lbry.wallet.server.metrics import ServerLoadData, calculate_avg_percentiles
class TestPercentileCalculation(unittest.TestCase): class TestPercentileCalculation(unittest.TestCase):
def test_calculate_percentiles(self): def test_calculate_percentiles(self):
self.assertEqual(calculate_percentiles([]), (0, 0, 0, 0, 0, 0, 0)) self.assertEqual(calculate_avg_percentiles([]), (0, 0, 0, 0, 0, 0, 0, 0))
self.assertEqual(calculate_percentiles([1]), (1, 1, 1, 1, 1, 1, 1)) self.assertEqual(calculate_avg_percentiles([1]), (1, 1, 1, 1, 1, 1, 1, 1))
self.assertEqual(calculate_percentiles([1, 2]), (1, 1, 1, 1, 2, 2, 2)) self.assertEqual(calculate_avg_percentiles([1, 2]), (1, 1, 1, 1, 1, 2, 2, 2))
self.assertEqual(calculate_percentiles([1, 2, 3]), (1, 1, 1, 2, 3, 3, 3)) self.assertEqual(calculate_avg_percentiles([1, 2, 3]), (2, 1, 1, 1, 2, 3, 3, 3))
self.assertEqual(calculate_percentiles([4, 1, 2, 3]), (1, 1, 1, 2, 3, 4, 4)) self.assertEqual(calculate_avg_percentiles([4, 1, 2, 3]), (2, 1, 1, 1, 2, 3, 4, 4))
self.assertEqual(calculate_percentiles([1, 2, 3, 4, 5, 6]), (1, 1, 2, 3, 5, 6, 6)) self.assertEqual(calculate_avg_percentiles([1, 2, 3, 4, 5, 6]), (3, 1, 1, 2, 3, 5, 6, 6))
self.assertEqual(calculate_percentiles(list(range(1, 101))), (1, 5, 25, 50, 75, 95, 100)) self.assertEqual(calculate_avg_percentiles(
list(range(1, 101))), (50, 1, 5, 25, 50, 75, 95, 100))
class TestCollectingMetrics(unittest.TestCase): class TestCollectingMetrics(unittest.TestCase):
@ -23,8 +24,8 @@ class TestCollectingMetrics(unittest.TestCase):
search = load.for_api('search') search = load.for_api('search')
self.assertEqual(search.name, 'search') self.assertEqual(search.name, 'search')
search.start() search.start()
search.cache_hit() search.cache_response()
search.cache_hit() search.cache_response()
metrics = { metrics = {
'search': [{'total': 40}], 'search': [{'total': 40}],
'execute_query': [ 'execute_query': [
@ -33,30 +34,27 @@ class TestCollectingMetrics(unittest.TestCase):
] ]
} }
for x in range(5): for x in range(5):
search.finish(time.perf_counter() - 0.055 + 0.001*x, metrics) search.query_response(time.perf_counter() - 0.055 + 0.001*x, metrics)
metrics['execute_query'][0]['total'] = 10 metrics['execute_query'][0]['total'] = 10
metrics['execute_query'][0]['sql'] = "select lots, of, stuff FROM claim where something=1" metrics['execute_query'][0]['sql'] = "select lots, of, stuff FROM claim where something=1"
search.interrupt(time.perf_counter() - 0.050, metrics) search.query_interrupt(time.perf_counter() - 0.050, metrics)
search.error(time.perf_counter() - 0.050, metrics) search.query_error(time.perf_counter() - 0.050, metrics)
search.error(time.perf_counter() - 0.052) search.query_error(time.perf_counter() - 0.052, {})
self.assertEqual(load.to_json_and_reset({}), {'status': {}, 'api': {'search': { self.assertEqual(load.to_json_and_reset({}), {'status': {}, 'api': {'search': {
'cache_hits_count': 2, "receive_count": 1,
'errored_count': 2, "cache_response_count": 2,
'errored_queries': ['FROM claim where something=1'], "query_response_count": 5,
'execution_avg': 12, "intrp_response_count": 1,
'execution_percentiles': (10, 10, 10, 10, 20, 20, 20), "error_response_count": 2,
'finished_count': 7, "response": (53, 51, 51, 52, 53, 54, 55, 55),
'individual_queries_count': 14, "interrupt": (50, 50, 50, 50, 50, 50, 50, 50),
'individual_query_avg': 13, "error": (51, 50, 50, 50, 50, 52, 52, 52),
'individual_query_percentiles': (10, 10, 10, 10, 20, 20, 20), "python": (12, 10, 10, 10, 10, 20, 20, 20),
'interrupted_count': 0, "wait": (12, 10, 10, 10, 12, 14, 15, 15),
'interrupted_queries': ['FROM claim where something=1'], "sql": (27, 20, 20, 20, 30, 30, 30, 30),
'query_avg': 27, "individual_sql": (13, 10, 10, 10, 10, 20, 20, 20),
'query_percentiles': (20, 20, 20, 30, 30, 30, 30), "individual_sql_count": 14,
'started_count': 1, "errored_queries": ['FROM claim where something=1'],
'total_avg': 52, "interrupted_queries": ['FROM claim where something=1'],
'total_percentiles': (50, 50, 50, 52, 54, 55, 55),
'wait_avg': 12,
'wait_percentiles': (10, 10, 10, 12, 14, 15, 15)
}}}) }}})
self.assertEqual(load.to_json_and_reset({}), {'status': {}, 'api': {}}) self.assertEqual(load.to_json_and_reset({}), {'status': {}, 'api': {}})