diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py index be0152b9a..032ca8355 100644 --- a/lbry/lbry/wallet/server/db/reader.py +++ b/lbry/lbry/wallet/server/db/reader.py @@ -169,7 +169,7 @@ def execute_query(sql, values) -> List: except sqlite3.OperationalError as err: plain_sql = interpolate(sql, values) if context.is_tracking_metrics: - context.metric['execute_query'][-1]['sql'] = plain_sql + context.metrics['execute_query'][-1]['sql'] = plain_sql if str(err) == "interrupted": context.log.warning("interrupted slow sqlite query:\n%s", plain_sql) raise SQLiteInterruptedError(context.metrics) diff --git a/lbry/lbry/wallet/server/metrics.py b/lbry/lbry/wallet/server/metrics.py index 95fd960bc..622999de3 100644 --- a/lbry/lbry/wallet/server/metrics.py +++ b/lbry/lbry/wallet/server/metrics.py @@ -7,12 +7,13 @@ def calculate_elapsed(start) -> int: return int((time.perf_counter() - start) * 1000) -def calculate_percentiles(data) -> Tuple[int, int, int, int, int, int, int]: +def calculate_avg_percentiles(data) -> Tuple[int, int, int, int, int, int, int, int]: if not data: - return 0, 0, 0, 0, 0, 0, 0 + return 0, 0, 0, 0, 0, 0, 0, 0 data.sort() size = len(data) return ( + int(sum(data) / size), data[0], data[math.ceil(size * .05) - 1], data[math.ceil(size * .25) - 1], @@ -23,10 +24,6 @@ def calculate_percentiles(data) -> Tuple[int, int, int, int, int, int, int]: ) -def avg(data) -> int: - return int(sum(data) / len(data)) if data else 0 - - def remove_select_list(sql) -> str: return sql[sql.index('FROM'):] @@ -35,79 +32,87 @@ class APICallMetrics: def __init__(self, name): self.name = name - # total counts - self.cache_hits = 0 - self.started = 0 - self.errored = 0 + + # total requests received + self.receive_count = 0 + self.cache_response_count = 0 + + # millisecond timings for query based responses + self.query_response_times = [] + self.query_intrp_times = [] + self.query_error_times = [] + + self.query_python_times = [] + self.query_wait_times = [] + self.query_sql_times = [] # aggregate total of multiple SQL calls made per request + + self.individual_sql_times = [] # every SQL query run on server + + # actual queries self.errored_queries = set() - self.interrupted = 0 self.interrupted_queries = set() - # timings - self.command_total_times = [] - self.command_query_times = [] - self.command_execution_times = [] - self.command_wait_times = [] - self.individual_query_times = [] def to_json_and_reset(self): return { - # total counts - "cache_hits_count": self.cache_hits, - "started_count": self.started, - "finished_count": len(self.command_total_times), - "errored_count": self.errored, + # total requests received + "receive_count": self.receive_count, + # sum of these is total responses made + "cache_response_count": self.cache_response_count, + "query_response_count": len(self.query_response_times), + "intrp_response_count": len(self.query_intrp_times), + "error_response_count": len(self.query_error_times), + # millisecond timings for non-cache responses + "response": calculate_avg_percentiles(self.query_response_times), + "interrupt": calculate_avg_percentiles(self.query_intrp_times), + "error": calculate_avg_percentiles(self.query_error_times), + # response, interrupt and error each also report the python, wait and sql stats: + "python": calculate_avg_percentiles(self.query_python_times), + "wait": calculate_avg_percentiles(self.query_wait_times), + "sql": calculate_avg_percentiles(self.query_sql_times), + # extended timings for individual sql executions + "individual_sql": calculate_avg_percentiles(self.individual_sql_times), + "individual_sql_count": len(self.individual_sql_times), + # actual queries "errored_queries": list(self.errored_queries), - "interrupted_count": self.interrupted, "interrupted_queries": list(self.interrupted_queries), - "individual_queries_count": len(self.individual_query_times), - # timings and percentiles - "total_avg": avg(self.command_total_times), - "total_percentiles": calculate_percentiles(self.command_total_times), - "query_avg": avg(self.command_query_times), - "query_percentiles": calculate_percentiles(self.command_query_times), - "execution_avg": avg(self.command_execution_times), - "execution_percentiles": calculate_percentiles(self.command_execution_times), - "wait_avg": avg(self.command_wait_times), - "wait_percentiles": calculate_percentiles(self.command_wait_times), - "individual_query_avg": avg(self.individual_query_times), - "individual_query_percentiles": calculate_percentiles(self.individual_query_times), } - def cache_hit(self): - self.cache_hits += 1 - def start(self): - self.started += 1 + self.receive_count += 1 - def finish(self, start, metrics): - self.command_total_times.append(calculate_elapsed(start)) + def cache_response(self): + self.cache_response_count += 1 + + def _add_query_timings(self, request_total_time, metrics): if metrics and 'execute_query' in metrics: - query_times = [f['total'] for f in metrics['execute_query']] - self.individual_query_times.extend(query_times) - command_query_time = sum(query_times) - self.command_query_times.append(command_query_time) - self.command_execution_times.append( - metrics[self.name][0]['total'] - command_query_time - ) - self.command_wait_times.append( - self.command_total_times[-1] - metrics[self.name][0]['total'] - ) + sub_process_total = metrics[self.name][0]['total'] + individual_query_times = [f['total'] for f in metrics['execute_query']] + aggregated_query_time = sum(individual_query_times) + self.individual_sql_times.extend(individual_query_times) + self.query_sql_times.append(aggregated_query_time) + self.query_python_times.append(sub_process_total - aggregated_query_time) + self.query_wait_times.append(request_total_time - sub_process_total) - def _add_queries(self, metrics, query_set): + @staticmethod + def _add_queries(query_set, metrics): if metrics and 'execute_query' in metrics: for execute_query in metrics['execute_query']: if 'sql' in execute_query: query_set.add(remove_select_list(execute_query['sql'])) - def interrupt(self, start, metrics): - self.finish(start, metrics) - self._add_queries(metrics, self.interrupted_queries) + def query_response(self, start, metrics): + self.query_response_times.append(calculate_elapsed(start)) + self._add_query_timings(self.query_response_times[-1], metrics) - def error(self, start, metrics=None): - self.errored += 1 - if metrics: - self.finish(start, metrics) - self._add_queries(metrics, self.errored_queries) + def query_interrupt(self, start, metrics): + self.query_intrp_times.append(calculate_elapsed(start)) + self._add_queries(self.interrupted_queries, metrics) + self._add_query_timings(self.query_intrp_times[-1], metrics) + + def query_error(self, start, metrics): + self.query_error_times.append(calculate_elapsed(start)) + self._add_queries(self.errored_queries, metrics) + self._add_query_timings(self.query_error_times[-1], metrics) class ServerLoadData: diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index 64ff7a457..7b861c934 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -15,7 +15,7 @@ from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.db.writer import LBRYDB from lbry.wallet.server.db import reader from lbry.wallet.server.websocket import AdminWebSocket -from lbry.wallet.server.metrics import ServerLoadData +from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics class ResultCacheItem: @@ -109,55 +109,50 @@ class LBRYElectrumX(ElectrumX): } self.request_handlers.update(handlers) - async def run_in_executor(self, name, func, kwargs): - result = start = api = None - - if self.env.track_metrics: - api = self.session_mgr.metrics.for_api(name) - api.start() - start = time.perf_counter() - + async def run_in_executor(self, metrics: APICallMetrics, func, kwargs): + start = time.perf_counter() try: result = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, func, kwargs ) except reader.SQLiteInterruptedError as error: - if self.env.track_metrics: - api.interrupt(start, error.metrics) + metrics.query_interrupt(start, error.metrics) raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') except reader.SQLiteOperationalError as error: - if self.env.track_metrics: - api.error(start, error.metrics) + metrics.query_error(start, error.metrics) raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') except: - if self.env.track_metrics: - api.error(start) + metrics.query_error(start, {}) raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') if self.env.track_metrics: - (result, metrics) = result - api.finish(start, metrics) + (result, metrics_data) = result + metrics.query_response(start, metrics_data) return base64.b64encode(result).decode() async def run_and_cache_query(self, query_name, function, kwargs): + if self.env.track_metrics: + metrics = self.session_mgr.metrics.for_api(query_name) + else: + metrics = APICallMetrics(query_name) + metrics.start() cache = self.session_mgr.search_cache[query_name] cache_key = str(kwargs) cache_item = cache.get(cache_key) if cache_item is None: cache_item = cache[cache_key] = ResultCacheItem() elif cache_item.result is not None: - self.session_mgr.metrics.for_api(query_name).cache_hit() + metrics.cache_response() return cache_item.result async with cache_item.lock: - result = cache_item.result - if result is None: - result = cache_item.result = await self.run_in_executor( - query_name, function, kwargs + if cache_item.result is None: + cache_item.result = await self.run_in_executor( + metrics, function, kwargs ) else: - self.session_mgr.metrics.for_api(query_name).cache_hit() - return result + metrics.cache_response() + return cache_item.result async def claimtrie_search(self, **kwargs): if kwargs: diff --git a/lbry/tests/unit/wallet/server/test_metrics.py b/lbry/tests/unit/wallet/server/test_metrics.py index 44438e9f4..c66d136fd 100644 --- a/lbry/tests/unit/wallet/server/test_metrics.py +++ b/lbry/tests/unit/wallet/server/test_metrics.py @@ -1,18 +1,19 @@ import time import unittest -from lbry.wallet.server.metrics import ServerLoadData, calculate_percentiles +from lbry.wallet.server.metrics import ServerLoadData, calculate_avg_percentiles class TestPercentileCalculation(unittest.TestCase): def test_calculate_percentiles(self): - self.assertEqual(calculate_percentiles([]), (0, 0, 0, 0, 0, 0, 0)) - self.assertEqual(calculate_percentiles([1]), (1, 1, 1, 1, 1, 1, 1)) - self.assertEqual(calculate_percentiles([1, 2]), (1, 1, 1, 1, 2, 2, 2)) - self.assertEqual(calculate_percentiles([1, 2, 3]), (1, 1, 1, 2, 3, 3, 3)) - self.assertEqual(calculate_percentiles([4, 1, 2, 3]), (1, 1, 1, 2, 3, 4, 4)) - self.assertEqual(calculate_percentiles([1, 2, 3, 4, 5, 6]), (1, 1, 2, 3, 5, 6, 6)) - self.assertEqual(calculate_percentiles(list(range(1, 101))), (1, 5, 25, 50, 75, 95, 100)) + self.assertEqual(calculate_avg_percentiles([]), (0, 0, 0, 0, 0, 0, 0, 0)) + self.assertEqual(calculate_avg_percentiles([1]), (1, 1, 1, 1, 1, 1, 1, 1)) + self.assertEqual(calculate_avg_percentiles([1, 2]), (1, 1, 1, 1, 1, 2, 2, 2)) + self.assertEqual(calculate_avg_percentiles([1, 2, 3]), (2, 1, 1, 1, 2, 3, 3, 3)) + self.assertEqual(calculate_avg_percentiles([4, 1, 2, 3]), (2, 1, 1, 1, 2, 3, 4, 4)) + self.assertEqual(calculate_avg_percentiles([1, 2, 3, 4, 5, 6]), (3, 1, 1, 2, 3, 5, 6, 6)) + self.assertEqual(calculate_avg_percentiles( + list(range(1, 101))), (50, 1, 5, 25, 50, 75, 95, 100)) class TestCollectingMetrics(unittest.TestCase): @@ -23,8 +24,8 @@ class TestCollectingMetrics(unittest.TestCase): search = load.for_api('search') self.assertEqual(search.name, 'search') search.start() - search.cache_hit() - search.cache_hit() + search.cache_response() + search.cache_response() metrics = { 'search': [{'total': 40}], 'execute_query': [ @@ -33,30 +34,27 @@ class TestCollectingMetrics(unittest.TestCase): ] } for x in range(5): - search.finish(time.perf_counter() - 0.055 + 0.001*x, metrics) + search.query_response(time.perf_counter() - 0.055 + 0.001*x, metrics) metrics['execute_query'][0]['total'] = 10 metrics['execute_query'][0]['sql'] = "select lots, of, stuff FROM claim where something=1" - search.interrupt(time.perf_counter() - 0.050, metrics) - search.error(time.perf_counter() - 0.050, metrics) - search.error(time.perf_counter() - 0.052) + search.query_interrupt(time.perf_counter() - 0.050, metrics) + search.query_error(time.perf_counter() - 0.050, metrics) + search.query_error(time.perf_counter() - 0.052, {}) self.assertEqual(load.to_json_and_reset({}), {'status': {}, 'api': {'search': { - 'cache_hits_count': 2, - 'errored_count': 2, - 'errored_queries': ['FROM claim where something=1'], - 'execution_avg': 12, - 'execution_percentiles': (10, 10, 10, 10, 20, 20, 20), - 'finished_count': 7, - 'individual_queries_count': 14, - 'individual_query_avg': 13, - 'individual_query_percentiles': (10, 10, 10, 10, 20, 20, 20), - 'interrupted_count': 0, - 'interrupted_queries': ['FROM claim where something=1'], - 'query_avg': 27, - 'query_percentiles': (20, 20, 20, 30, 30, 30, 30), - 'started_count': 1, - 'total_avg': 52, - 'total_percentiles': (50, 50, 50, 52, 54, 55, 55), - 'wait_avg': 12, - 'wait_percentiles': (10, 10, 10, 12, 14, 15, 15) + "receive_count": 1, + "cache_response_count": 2, + "query_response_count": 5, + "intrp_response_count": 1, + "error_response_count": 2, + "response": (53, 51, 51, 52, 53, 54, 55, 55), + "interrupt": (50, 50, 50, 50, 50, 50, 50, 50), + "error": (51, 50, 50, 50, 50, 52, 52, 52), + "python": (12, 10, 10, 10, 10, 20, 20, 20), + "wait": (12, 10, 10, 10, 12, 14, 15, 15), + "sql": (27, 20, 20, 20, 30, 30, 30, 30), + "individual_sql": (13, 10, 10, 10, 10, 20, 20, 20), + "individual_sql_count": 14, + "errored_queries": ['FROM claim where something=1'], + "interrupted_queries": ['FROM claim where something=1'], }}}) self.assertEqual(load.to_json_and_reset({}), {'status': {}, 'api': {}})