From db3f9af174f2b68f6a5d4b38395635f8ffbeca28 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Sat, 13 Jul 2019 00:34:40 -0400 Subject: [PATCH] refactored metrics gathering for wallet server --- lbry/lbry/wallet/server/db/reader.py | 176 ++++++++++++++++++--------- lbry/lbry/wallet/server/session.py | 118 ++++++++++++------ 2 files changed, 195 insertions(+), 99 deletions(-) diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py index 220f4eef2..820a970f5 100644 --- a/lbry/lbry/wallet/server/db/reader.py +++ b/lbry/lbry/wallet/server/db/reader.py @@ -1,16 +1,19 @@ import time import struct import sqlite3 -from typing import Tuple, List, Any +from typing import Tuple, List, Dict, Union, Type, Optional from binascii import unhexlify from decimal import Decimal from contextvars import ContextVar +from functools import wraps +from dataclasses import dataclass from torba.client.basedatabase import query from lbry.schema.url import URL, normalize_name from lbry.schema.tags import clean_tags from lbry.schema.result import Outputs +from lbry.wallet.ledger import BaseLedger, MainNetLedger, RegTestLedger from .common import CLAIM_TYPES, STREAM_TYPES @@ -49,25 +52,96 @@ PRAGMAS = """ """ -db = ContextVar('db') -ledger = ContextVar('ledger') +@dataclass +class ReaderState: + db: sqlite3.Connection + stack: List[List] + metrics: Dict + is_tracking_metrics: bool + ledger: Type[BaseLedger] + + def close(self): + self.db.close() + + def reset_metrics(self): + self.stack = [] + self.metrics = {} -def initializer(_path, _ledger_name): - _db = sqlite3.connect(_path, isolation_level=None, uri=True) - _db.row_factory = sqlite3.Row - db.set(_db) - from lbry.wallet.ledger import MainNetLedger, RegTestLedger - ledger.set(MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger) +ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') + + +def initializer(_path, _ledger_name, _measure=False): + db = sqlite3.connect(_path, isolation_level=None, uri=True) + db.row_factory = sqlite3.Row + ctx.set(ReaderState( + db=db, stack=[], metrics={}, is_tracking_metrics=_measure, + ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger + )) def cleanup(): - db.get().close() - db.set(None) - ledger.set(None) + ctx.get().close() + ctx.set(None) -def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]: +def measure(func): + @wraps(func) + def wrapper(*args, **kwargs): + state = ctx.get() + if not state.is_tracking_metrics: + return func(*args, **kwargs) + metric = state.metrics.setdefault(func.__name__, {'calls': 0, 'total': 0, 'isolated': 0}) + state.stack.append([]) + start = time.perf_counter() + try: + return func(*args, **kwargs) + except: + metric['errors'] += 1 + raise + finally: + elapsed = int((time.perf_counter()-start)*1000) + metric['calls'] += 1 + metric['total'] += elapsed + metric['isolated'] += (elapsed-sum(state.stack.pop())) + if state.stack: + state.stack[-1].append(elapsed) + return wrapper + + +def reports_metrics(func): + @wraps(func) + def wrapper(*args, **kwargs): + state = ctx.get() + if not state.is_tracking_metrics: + return func(*args, **kwargs) + state.reset_metrics() + r = func(*args, **kwargs) + return r, state.metrics + return wrapper + + +@reports_metrics +def search_to_bytes(constraints) -> Union[bytes, Tuple[bytes, Dict]]: + return encode_result(search(constraints)) + + +@reports_metrics +def resolve_to_bytes(urls) -> Union[bytes, Tuple[bytes, Dict]]: + return encode_result(resolve(urls)) + + +def encode_result(result): + return Outputs.to_bytes(*result) + + +@measure +def execute_query(sql, values) -> List: + return ctx.get().db.execute(sql, values).fetchall() + + +@measure +def get_claims(cols, for_count=False, **constraints) -> List: if 'order_by' in constraints: sql_order_by = [] for order_by in constraints['order_by']: @@ -121,15 +195,15 @@ def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]: if 'public_key_id' in constraints: constraints['claim.public_key_hash'] = sqlite3.Binary( - ledger.get().address_to_hash160(constraints.pop('public_key_id'))) + ctx.get().ledger.address_to_hash160(constraints.pop('public_key_id'))) if 'channel' in constraints: channel_url = constraints.pop('channel') - match, _ = _resolve_one(channel_url) + match = resolve_url(channel_url) if isinstance(match, sqlite3.Row): constraints['channel_hash'] = match['claim_hash'] else: - return [[0]] if cols == 'count(*)' else [], 0 + return [[0]] if cols == 'count(*)' else [] if 'channel_hash' in constraints: constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash')) if 'channel_ids' in constraints: @@ -198,46 +272,38 @@ def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]: LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash) """, **constraints ) - start = time.time() - result = db.get().execute(sql, values).fetchall() - return result, time.time()-start + + return execute_query(sql, values) -def get_claims_count(**constraints) -> Tuple[int, float]: +@measure +def get_claims_count(**constraints) -> int: constraints.pop('offset', None) constraints.pop('limit', None) constraints.pop('order_by', None) - count, elapsed = get_claims('count(*)', for_count=True, **constraints) - return count[0][0], elapsed + count = get_claims('count(*)', for_count=True, **constraints) + return count[0][0] -def search(constraints) -> Tuple[List, List, int, int, dict]: +@measure +def search(constraints) -> Tuple[List, List, int, int]: assert set(constraints).issubset(SEARCH_PARAMS), \ f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" - _metrics = {} total = None if not constraints.pop('no_totals', False): - total, _metrics['count'] = get_claims_count(**constraints) + total = get_claims_count(**constraints) constraints['offset'] = abs(constraints.get('offset', 0)) constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) if 'order_by' not in constraints: constraints['order_by'] = ["height", "^name"] - txo_rows, _metrics['claim.search'] = _search(**constraints) + txo_rows = _search(**constraints) channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash']) extra_txo_rows = [] if channel_hashes: - extra_txo_rows, _metrics['channel.search'] = _search( + extra_txo_rows = _search( **{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]} ) - return txo_rows, extra_txo_rows, constraints['offset'], total, _metrics - - -def search_to_bytes(constraints) -> (bytes, dict): - start = time.time() - *result, _metrics = search(constraints) - output = Outputs.to_bytes(*result) - _metrics['total'] = time.time()-start - return output, _metrics + return txo_rows, extra_txo_rows, constraints['offset'], total def _search(**constraints): @@ -259,39 +325,30 @@ def _search(**constraints): ) -def resolve(urls) -> Tuple[List, List, dict]: +@measure +def resolve(urls) -> Tuple[List, List]: result = [] - _metrics = {'urls': []} channel_hashes = set() for raw_url in urls: - match, metric = _resolve_one(raw_url) + match = resolve_url(raw_url) result.append(match) - _metrics['urls'].append(metric) if isinstance(match, sqlite3.Row) and match['channel_hash']: channel_hashes.add(match['channel_hash']) extra_txo_rows = [] if channel_hashes: - extra_txo_rows, _metrics['channel.search'] = _search( + extra_txo_rows = _search( **{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]} ) - return result, extra_txo_rows, _metrics + return result, extra_txo_rows -def resolve_to_bytes(urls) -> Tuple[bytes, dict]: - start = time.time() - *result, _metrics = resolve(urls) - output = Outputs.to_bytes(*result) # pylint: disable=E1120 - _metrics['total'] = time.time()-start - return output, _metrics - - -def _resolve_one(raw_url) -> Tuple[Any, dict]: - _metrics = {} +@measure +def resolve_url(raw_url): try: url = URL.parse(raw_url) except ValueError as e: - return e, _metrics + return e channel = None @@ -301,11 +358,11 @@ def _resolve_one(raw_url) -> Tuple[Any, dict]: query['is_controlling'] = True else: query['order_by'] = ['^height'] - matches, _metrics['channel.search'] = _search(**query, limit=1) + matches = _search(**query, limit=1) if matches: channel = matches[0] else: - return LookupError(f'Could not find channel in "{raw_url}".'), _metrics + return LookupError(f'Could not find channel in "{raw_url}".') if url.has_stream: query = url.stream.to_dict() @@ -319,15 +376,16 @@ def _resolve_one(raw_url) -> Tuple[Any, dict]: query['signature_valid'] = 1 elif set(query) == {'name'}: query['is_controlling'] = 1 - matches, _metrics['stream.search'] = _search(**query, limit=1) + matches = _search(**query, limit=1) if matches: - return matches[0], _metrics + return matches[0] else: - return LookupError(f'Could not find stream in "{raw_url}".'), _metrics + return LookupError(f'Could not find stream in "{raw_url}".') - return channel, _metrics + return channel +@measure def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] if any_items: diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index f267edd4c..cd0d3f0d7 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -1,5 +1,6 @@ import os import math +import time import base64 import asyncio from binascii import hexlify @@ -77,54 +78,80 @@ class LBRYSessionManager(SessionManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.query_executor = None - if self.can_start_websocket: + self.websocket = None + self.metrics_processor = None + self.command_metrics = {} + self.reader_metrics = {} + self.running = False + if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) - self.metric_sender = None - self.metrics = { - 'search': 0, - 'search_time': 0, - 'resolve': 0, - 'resolve_time': 0 + + def get_command_tracking_info(self, command): + if command not in self.command_metrics: + self.command_metrics[command] = { + 'started': 0, + 'finished': 0, + 'total_time': 0, + 'execution_time': 0, + 'query_time': 0, + 'query_count': 0, } + return self.command_metrics[command] - @property - def can_start_websocket(self): - return self.env.websocket_host is not None and self.env.websocket_port is not None + def start_command_tracking(self, command_name): + if self.env.track_metrics: + command = self.get_command_tracking_info(command_name) + command['started'] += 1 - def add_metric(self, command, metric): - if self.can_start_websocket: - self.metrics[command] += 1 - self.metrics[f'{command}_time'] += int(metric['total']*1000) + def finish_command_tracking(self, command_name, elapsed, metrics): + if self.env.track_metrics: + command = self.get_command_tracking_info(command_name) + command['finished'] += 1 + command['total_time'] += elapsed + command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total']) + command['query_time'] += metrics['execute_query']['total'] + command['query_count'] += metrics['execute_query']['calls'] + for func_name, func_metrics in metrics.items(): + reader = self.reader_metrics.setdefault(func_name, {}) + for key in func_metrics: + if key not in reader: + reader[key] = func_metrics[key] + else: + reader[key] += func_metrics[key] - async def send_metrics(self): + async def process_metrics(self): while self.running: - metrics = self.metrics - self.metrics = { - 'search': 0, - 'search_time': 0, - 'resolve': 0, - 'resolve_time': 0 - } - self.websocket.send_message(metrics) + commands, self.command_metrics = self.command_metrics, {} + reader, self.reader_metrics = self.reader_metrics, {} + if self.websocket is not None: + self.websocket.send_message({ + 'commands': commands, + 'reader': reader + }) await asyncio.sleep(1) async def start_other(self): - args = dict(initializer=reader.initializer, initargs=('claims.db', self.env.coin.NET)) + self.running = True + args = dict( + initializer=reader.initializer, + initargs=('claims.db', self.env.coin.NET, self.env.track_metrics) + ) if self.env.max_query_workers is not None and self.env.max_query_workers == 0: self.query_executor = ThreadPoolExecutor(max_workers=1, **args) else: self.query_executor = ProcessPoolExecutor( max_workers=self.env.max_query_workers or max(os.cpu_count(), 4), **args ) - if self.can_start_websocket: - self.running = True + if self.websocket is not None: await self.websocket.start() - self.metric_sender = asyncio.create_task(self.send_metrics()) + if self.env.track_metrics: + self.metrics_processor = asyncio.create_task(self.process_metrics()) async def stop_other(self): - if self.can_start_websocket: - self.running = False - self.metric_sender.cancel() + self.running = False + if self.env.track_metrics: + self.metrics_processor.cancel() + if self.websocket is not None: await self.websocket.stop() self.query_executor.shutdown() @@ -132,6 +159,7 @@ class LBRYSessionManager(SessionManager): class LBRYElectrumX(ElectrumX): PROTOCOL_MIN = (0, 0) # temporary, for supporting 0.10 protocol max_errors = math.inf # don't disconnect people for errors! let them happen... + session_mgr: LBRYSessionManager def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -151,21 +179,31 @@ class LBRYElectrumX(ElectrumX): } self.request_handlers.update(handlers) + async def run_in_executor(self, name, func, kwargs): + start = None + if self.env.track_metrics: + self.session_mgr.start_command_tracking(name) + start = time.perf_counter() + result = await asyncio.get_running_loop().run_in_executor( + self.session_mgr.query_executor, func, kwargs + ) + if self.env.track_metrics: + elapsed = int((time.perf_counter() - start) * 1000) + (result, metrics) = result + self.session_mgr.finish_command_tracking(name, elapsed, metrics) + return result + async def claimtrie_search(self, **kwargs): if 'claim_id' in kwargs: self.assert_claim_id(kwargs['claim_id']) - data, metrics = await asyncio.get_running_loop().run_in_executor( - self.session_mgr.query_executor, reader.search_to_bytes, kwargs - ) - self.session_mgr.add_metric('search', metrics) - return base64.b64encode(data).decode() + return base64.b64encode( + await self.run_in_executor('search', reader.search_to_bytes, kwargs) + ).decode() async def claimtrie_resolve(self, *urls): - data, metrics = await asyncio.get_running_loop().run_in_executor( - self.session_mgr.query_executor, reader.resolve_to_bytes, urls - ) - self.session_mgr.add_metric('resolve', metrics) - return base64.b64encode(data).decode() + return base64.b64encode( + await self.run_in_executor('resolve', reader.resolve_to_bytes, urls) + ).decode() async def get_server_height(self): return self.bp.height