From 046459a72de793883ead71e74fd3e1456ff1f57a Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Wed, 17 Jul 2019 21:50:20 -0400 Subject: [PATCH] refactored websockets and metrics --- lbry/lbry/wallet/server/db/reader.py | 32 +++--- lbry/lbry/wallet/server/db/writer.py | 2 +- lbry/lbry/wallet/server/session.py | 155 +++++---------------------- lbry/lbry/wallet/server/websocket.py | 55 ++++++++++ 4 files changed, 100 insertions(+), 144 deletions(-) create mode 100644 lbry/lbry/wallet/server/websocket.py diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py index e0c9ac450..e7f98813e 100644 --- a/lbry/lbry/wallet/server/db/reader.py +++ b/lbry/lbry/wallet/server/db/reader.py @@ -9,7 +9,7 @@ from contextvars import ContextVar from functools import wraps from dataclasses import dataclass -from torba.client.basedatabase import query +from torba.client.basedatabase import query, interpolate from lbry.schema.url import URL, normalize_name from lbry.schema.tags import clean_tags @@ -19,6 +19,12 @@ from lbry.wallet.ledger import BaseLedger, MainNetLedger, RegTestLedger from .common import CLAIM_TYPES, STREAM_TYPES +class SQLiteOperationalError(sqlite3.OperationalError): + def __init__(self, metrics): + super().__init__('sqlite query errored') + self.metrics = metrics + + class SQLiteInterruptedError(sqlite3.OperationalError): def __init__(self, metrics): super().__init__('sqlite query interrupted') @@ -91,7 +97,6 @@ ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') def initializer(log, _path, _ledger_name, query_timeout, _measure=False): - db = sqlite3.connect(_path, isolation_level=None, uri=True) db.row_factory = sqlite3.Row ctx.set( @@ -114,21 +119,18 @@ def measure(func): state = ctx.get() if not state.is_tracking_metrics: return func(*args, **kwargs) - metric = state.metrics.setdefault(func.__name__, { - 'calls': 0, 'total': 0, 'isolated': 0, 'errors': 0 - }) + metric = {} + state.metrics.setdefault(func.__name__, []).append(metric) state.stack.append([]) start = time.perf_counter() try: return func(*args, **kwargs) except: - metric['errors'] += 1 raise finally: elapsed = int((time.perf_counter()-start)*1000) - metric['calls'] += 1 - metric['total'] += elapsed - metric['isolated'] += (elapsed-sum(state.stack.pop())) + metric['total'] = elapsed + metric['isolated'] = (elapsed-sum(state.stack.pop())) if state.stack: state.stack[-1].append(elapsed) return wrapper @@ -167,15 +169,16 @@ def execute_query(sql, values) -> List: try: return context.db.execute(sql, values).fetchall() except sqlite3.OperationalError as err: + plain_sql = interpolate(sql, values) + if not context.is_tracking_metrics: + context.metric['execute_query'][-1]['sql'] = plain_sql if str(err) == "interrupted": - query_str = sql - for k in sorted(values.keys(), reverse=True): - query_str = query_str.replace(f":{k}", str(values[k]) if not k.startswith("$") else f"'{values[k]}'") - context.log.warning("interrupted slow sqlite query:\n%s", query_str) + context.log.warning("interrupted slow sqlite query:\n%s", plain_sql) raise SQLiteInterruptedError(context.metrics) + context.log.exception('failed running query', exc_info=err) + raise SQLiteOperationalError(context.metrics) -@measure def get_claims(cols, for_count=False, **constraints) -> List: if 'order_by' in constraints: sql_order_by = [] @@ -420,7 +423,6 @@ def resolve_url(raw_url): return channel -@measure def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) diff --git a/lbry/lbry/wallet/server/db/writer.py b/lbry/lbry/wallet/server/db/writer.py index f8aa93820..31d4dff22 100644 --- a/lbry/lbry/wallet/server/db/writer.py +++ b/lbry/lbry/wallet/server/db/writer.py @@ -257,7 +257,7 @@ class SQLDB: if tags: self.db.executemany( - "INSERT INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values() + "INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values() ) return claims diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index 5afb99530..64ff7a457 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -4,13 +4,9 @@ import time import base64 import asyncio from binascii import hexlify -from weakref import WeakSet from pylru import lrucache from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor -from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite -from aiohttp.http_websocket import WSMsgType, WSCloseCode - from torba.rpc.jsonrpc import RPCError, JSONRPC from torba.server.session import ElectrumX, SessionManager from torba.server import util @@ -18,56 +14,8 @@ from torba.server import util from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.db.writer import LBRYDB from lbry.wallet.server.db import reader - - -class AdminWebSocket: - - def __init__(self, manager): - self.manager = manager - self.app = Application() - self.app['websockets'] = WeakSet() - self.app.router.add_get('/', self.on_connect) - self.app.on_shutdown.append(self.on_shutdown) - self.runner = AppRunner(self.app) - - async def on_status(self, _): - if not self.app['websockets']: - return - self.send_message({ - 'type': 'status', - 'height': self.manager.daemon.cached_height(), - }) - - def send_message(self, msg): - for web_socket in self.app['websockets']: - asyncio.create_task(web_socket.send_json(msg)) - - async def start(self): - await self.runner.setup() - await TCPSite(self.runner, self.manager.env.websocket_host, self.manager.env.websocket_port).start() - - async def stop(self): - await self.runner.cleanup() - - async def on_connect(self, request): - web_socket = WebSocketResponse() - await web_socket.prepare(request) - self.app['websockets'].add(web_socket) - try: - async for msg in web_socket: - if msg.type == WSMsgType.TEXT: - await self.on_status(None) - elif msg.type == WSMsgType.ERROR: - print('web socket connection closed with exception %s' % - web_socket.exception()) - finally: - self.app['websockets'].discard(web_socket) - return web_socket - - @staticmethod - async def on_shutdown(app): - for web_socket in set(app['websockets']): - await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown') +from lbry.wallet.server.websocket import AdminWebSocket +from lbry.wallet.server.metrics import ServerLoadData class ResultCacheItem: @@ -95,9 +43,8 @@ class LBRYSessionManager(SessionManager): super().__init__(*args, **kwargs) self.query_executor = None self.websocket = None - self.metrics_processor = None - self.command_metrics = {} - self.reader_metrics = {} + self.metrics = ServerLoadData() + self.metrics_loop = None self.running = False if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) @@ -105,69 +52,11 @@ class LBRYSessionManager(SessionManager): self.search_cache['search'] = lrucache(10000) self.search_cache['resolve'] = lrucache(10000) - def get_command_tracking_info(self, command): - if command not in self.command_metrics: - self.command_metrics[command] = { - 'cache_hit': 0, - 'started': 0, - 'finished': 0, - 'total_time': 0, - 'execution_time': 0, - 'query_time': 0, - 'query_count': 0, - 'interrupted': 0, - 'interrupted_query_values': [], - } - return self.command_metrics[command] - - def cache_hit(self, command_name): - if self.env.track_metrics: - command = self.get_command_tracking_info(command_name) - command['cache_hit'] += 1 - - def start_command_tracking(self, command_name): - if self.env.track_metrics: - command = self.get_command_tracking_info(command_name) - command['started'] += 1 - - def finish_command_tracking(self, command_name, elapsed, metrics): - if self.env.track_metrics: - command = self.get_command_tracking_info(command_name) - command['finished'] += 1 - command['total_time'] += elapsed - if 'execute_query' in metrics: - command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total']) - command['query_time'] += metrics['execute_query']['total'] - command['query_count'] += metrics['execute_query']['calls'] - for func_name, func_metrics in metrics.items(): - reader = self.reader_metrics.setdefault(func_name, {}) - for key in func_metrics: - if key not in reader: - reader[key] = func_metrics[key] - else: - reader[key] += func_metrics[key] - - def interrupted_command_error(self, command_name, elapsed, metrics, kwargs): - if self.env.track_metrics: - command = self.get_command_tracking_info(command_name) - command['finished'] += 1 - command['interrupted'] += 1 - command['total_time'] += elapsed - command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total']) - command['query_time'] += metrics['execute_query']['total'] - command['query_count'] += metrics['execute_query']['calls'] - if len(command['interrupted_query_values']) < 100: - command['interrupted_query_values'].append(kwargs) - async def process_metrics(self): while self.running: - commands, self.command_metrics = self.command_metrics, {} - reader, self.reader_metrics = self.reader_metrics, {} + data = self.metrics.to_json_and_reset({'sessions': self.session_count()}) if self.websocket is not None: - self.websocket.send_message({ - 'commands': commands, - 'reader': reader - }) + self.websocket.send_message(data) await asyncio.sleep(1) async def start_other(self): @@ -186,12 +75,12 @@ class LBRYSessionManager(SessionManager): if self.websocket is not None: await self.websocket.start() if self.env.track_metrics: - self.metrics_processor = asyncio.create_task(self.process_metrics()) + self.metrics_loop = asyncio.create_task(self.process_metrics()) async def stop_other(self): self.running = False if self.env.track_metrics: - self.metrics_processor.cancel() + self.metrics_loop.cancel() if self.websocket is not None: await self.websocket.stop() self.query_executor.shutdown() @@ -221,24 +110,34 @@ class LBRYElectrumX(ElectrumX): self.request_handlers.update(handlers) async def run_in_executor(self, name, func, kwargs): - start = None + result = start = api = None + if self.env.track_metrics: - self.session_mgr.start_command_tracking(name) + api = self.session_mgr.metrics.for_api(name) + api.start() start = time.perf_counter() + try: result = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, func, kwargs ) except reader.SQLiteInterruptedError as error: - self.session_mgr.interrupted_command_error( - name, int((time.perf_counter() - start) * 1000), error.metrics, kwargs - ) + if self.env.track_metrics: + api.interrupt(start, error.metrics) raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') + except reader.SQLiteOperationalError as error: + if self.env.track_metrics: + api.error(start, error.metrics) + raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') + except: + if self.env.track_metrics: + api.error(start) + raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') if self.env.track_metrics: - elapsed = int((time.perf_counter() - start) * 1000) (result, metrics) = result - self.session_mgr.finish_command_tracking(name, elapsed, metrics) + api.finish(start, metrics) + return base64.b64encode(result).decode() async def run_and_cache_query(self, query_name, function, kwargs): @@ -248,7 +147,7 @@ class LBRYElectrumX(ElectrumX): if cache_item is None: cache_item = cache[cache_key] = ResultCacheItem() elif cache_item.result is not None: - self.session_mgr.cache_hit(query_name) + self.session_mgr.metrics.for_api(query_name).cache_hit() return cache_item.result async with cache_item.lock: result = cache_item.result @@ -257,7 +156,7 @@ class LBRYElectrumX(ElectrumX): query_name, function, kwargs ) else: - self.session_mgr.cache_hit(query_name) + self.session_mgr.metrics.for_api(query_name).cache_hit() return result async def claimtrie_search(self, **kwargs): diff --git a/lbry/lbry/wallet/server/websocket.py b/lbry/lbry/wallet/server/websocket.py new file mode 100644 index 000000000..9620918cb --- /dev/null +++ b/lbry/lbry/wallet/server/websocket.py @@ -0,0 +1,55 @@ +import asyncio +from weakref import WeakSet + +from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite +from aiohttp.http_websocket import WSMsgType, WSCloseCode + + +class AdminWebSocket: + + def __init__(self, manager): + self.manager = manager + self.app = Application() + self.app['websockets'] = WeakSet() + self.app.router.add_get('/', self.on_connect) + self.app.on_shutdown.append(self.on_shutdown) + self.runner = AppRunner(self.app) + + async def on_status(self, _): + if not self.app['websockets']: + return + self.send_message({ + 'type': 'status', + 'height': self.manager.daemon.cached_height(), + }) + + def send_message(self, msg): + for web_socket in self.app['websockets']: + asyncio.create_task(web_socket.send_json(msg)) + + async def start(self): + await self.runner.setup() + await TCPSite(self.runner, self.manager.env.websocket_host, self.manager.env.websocket_port).start() + + async def stop(self): + await self.runner.cleanup() + + async def on_connect(self, request): + web_socket = WebSocketResponse() + await web_socket.prepare(request) + self.app['websockets'].add(web_socket) + try: + async for msg in web_socket: + if msg.type == WSMsgType.TEXT: + await self.on_status(None) + elif msg.type == WSMsgType.ERROR: + print('web socket connection closed with exception %s' % + web_socket.exception()) + finally: + self.app['websockets'].discard(web_socket) + return web_socket + + @staticmethod + async def on_shutdown(app): + for web_socket in set(app['websockets']): + await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')