diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py index 03b9b210b..fd8b05d78 100644 --- a/lbry/lbry/wallet/server/db/reader.py +++ b/lbry/lbry/wallet/server/db/reader.py @@ -1,6 +1,7 @@ -import sqlite3 +import time import struct -from typing import Tuple, List +import sqlite3 +from typing import Tuple, List, Any from binascii import unhexlify from decimal import Decimal from contextvars import ContextVar @@ -66,7 +67,7 @@ def cleanup(): ledger.set(None) -def get_claims(cols, for_count=False, **constraints): +def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]: if 'order_by' in constraints: sql_order_by = [] for order_by in constraints['order_by']: @@ -124,11 +125,11 @@ def get_claims(cols, for_count=False, **constraints): if 'channel' in constraints: channel_url = constraints.pop('channel') - match = _resolve_one(channel_url) + match, _ = _resolve_one(channel_url) if isinstance(match, sqlite3.Row): constraints['channel_hash'] = match['claim_hash'] else: - return [[0]] if cols == 'count(*)' else [] + return [[0]] if cols == 'count(*)' else [], 0 if 'channel_hash' in constraints: constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash')) if 'channel_ids' in constraints: @@ -197,37 +198,46 @@ def get_claims(cols, for_count=False, **constraints): LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash) """, **constraints ) - return db.get().execute(sql, values).fetchall() + start = time.time() + result = db.get().execute(sql, values).fetchall() + return result, time.time()-start -def get_claims_count(**constraints): +def get_claims_count(**constraints) -> Tuple[int, float]: constraints.pop('offset', None) constraints.pop('limit', None) constraints.pop('order_by', None) - count = get_claims('count(*)', for_count=True, **constraints) - return count[0][0] + count, elapsed = get_claims('count(*)', for_count=True, **constraints) + return count[0][0], elapsed -def search(constraints) -> Tuple[List, List, int, int]: +def search(constraints) -> Tuple[List, List, int, int, dict]: assert set(constraints).issubset(SEARCH_PARAMS), \ f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" + _metrics = {} total = None if not constraints.pop('no_totals', False): - total = get_claims_count(**constraints) + total, _metrics['count'] = get_claims_count(**constraints) constraints['offset'] = abs(constraints.get('offset', 0)) constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) if 'order_by' not in constraints: constraints['order_by'] = ["height", "^name"] - txo_rows = _search(**constraints) + txo_rows, _metrics['claim.search'] = _search(**constraints) channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash']) extra_txo_rows = [] if channel_hashes: - extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) - return txo_rows, extra_txo_rows, constraints['offset'], total + extra_txo_rows, _metrics['channel.search'] = _search( + **{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]} + ) + return txo_rows, extra_txo_rows, constraints['offset'], total, _metrics -def search_to_bytes(constraints) -> bytes: - return Outputs.to_bytes(*search(constraints)) +def search_to_bytes(constraints) -> (bytes, dict): + start = time.time() + *result, _metrics = search(constraints) + output = Outputs.to_bytes(*result) + _metrics['total'] = time.time()-start + return output, _metrics def _search(**constraints): @@ -249,29 +259,39 @@ def _search(**constraints): ) -def resolve(urls) -> Tuple[List, List]: +def resolve(urls) -> Tuple[List, List, dict]: result = [] + _metrics = {'urls': []} channel_hashes = set() for raw_url in urls: - match = _resolve_one(raw_url) + match, metric = _resolve_one(raw_url) result.append(match) + _metrics['urls'].append(metric) if isinstance(match, sqlite3.Row) and match['channel_hash']: channel_hashes.add(match['channel_hash']) extra_txo_rows = [] if channel_hashes: - extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) - return result, extra_txo_rows + extra_txo_rows, _metrics['channel.search'] = _search( + **{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]} + ) + return result, extra_txo_rows, _metrics -def resolve_to_bytes(urls) -> bytes: - return Outputs.to_bytes(*resolve(urls)) +def resolve_to_bytes(urls) -> Tuple[bytes, dict]: + start = time.time() + *result, _metrics = resolve(urls) + output = Outputs.to_bytes(*result) + _metrics['total'] = time.time()-start + return output, _metrics -def _resolve_one(raw_url): +def _resolve_one(raw_url) -> Tuple[Any, dict]: + _metrics = {} + try: url = URL.parse(raw_url) except ValueError as e: - return e + return e, _metrics channel = None @@ -281,11 +301,11 @@ def _resolve_one(raw_url): query['is_controlling'] = True else: query['order_by'] = ['^height'] - matches = _search(**query, limit=1) + matches, _metrics['channel.search'] = _search(**query, limit=1) if matches: channel = matches[0] else: - return LookupError(f'Could not find channel in "{raw_url}".') + return LookupError(f'Could not find channel in "{raw_url}".'), _metrics if url.has_stream: query = url.stream.to_dict() @@ -299,13 +319,13 @@ def _resolve_one(raw_url): query['signature_valid'] = 1 elif set(query) == {'name'}: query['is_controlling'] = 1 - matches = _search(**query, limit=1) + matches, _metrics['stream.search'] = _search(**query, limit=1) if matches: - return matches[0] + return matches[0], _metrics else: - return LookupError(f'Could not find stream in "{raw_url}".') + return LookupError(f'Could not find stream in "{raw_url}".'), _metrics - return channel + return channel, _metrics def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index de04d3389..f023aa028 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -3,6 +3,11 @@ import math import base64 import asyncio from binascii import hexlify +#from weakref import WeakSet +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor + +#from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite +#from aiohttp.http_websocket import WSMsgType, WSCloseCode from torba.rpc.jsonrpc import RPCError from torba.server.session import ElectrumX, SessionManager @@ -11,7 +16,60 @@ from torba.server import util from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.db.writer import LBRYDB from lbry.wallet.server.db import reader -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor + + +class AdminWebSocket: + + def __init__(self, manager): + self.manager = manager + self.app = Application() + self.app['websockets'] = WeakSet() + self.app.router.add_get('/', self.on_connect) + self.app.on_shutdown.append(self.on_shutdown) + self.runner = AppRunner(self.app) + + async def on_status(self, _): + if not self.app['websockets']: + return + self.send_message({ + 'type': 'status', + 'height': self.manager.daemon.cached_height(), + }) + + def send_message(self, msg): + for web_socket in self.app['websockets']: + asyncio.create_task(web_socket.send_json(msg)) + + async def start(self): + await self.runner.setup() + await TCPSite(self.runner, self.manager.env.websocket_host, self.manager.env.websocket_port).start() + print('started websocket') + + async def stop(self): + await self.runner.cleanup() + print('stopped websocket') + + async def on_connect(self, request): + web_socket = WebSocketResponse() + await web_socket.prepare(request) + self.app['websockets'].add(web_socket) + try: + async for msg in web_socket: + if msg.type == WSMsgType.TEXT: + print(msg.data) + await self.on_status(None) + elif msg.type == WSMsgType.ERROR: + print('web socket connection closed with exception %s' % + web_socket.exception()) + finally: + self.app['websockets'].discard(web_socket) + return web_socket + + @staticmethod + async def on_shutdown(app): + print('disconnecting websockets') + for web_socket in app['websockets']: + await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown') class LBRYSessionManager(SessionManager): @@ -19,6 +77,36 @@ class LBRYSessionManager(SessionManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.query_executor = None + if self.can_start_websocket: + self.websocket = AdminWebSocket(self) + self.metric_sender = None + self.metrics = { + 'search': 0, + 'search_time': 0, + 'resolve': 0, + 'resolve_time': 0 + } + + @property + def can_start_websocket(self): + return self.env.websocket_host is not None and self.env.websocket_port is not None + + def add_metric(self, command, metric): + if self.can_start_websocket: + self.metrics[command] += 1 + self.metrics[f'{command}_time'] += int(metric['total']*1000) + + async def send_metrics(self): + while self.running: + metrics = self.metrics + self.metrics = { + 'search': 0, + 'search_time': 0, + 'resolve': 0, + 'resolve_time': 0 + } + self.websocket.send_message(metrics) + await asyncio.sleep(1) async def start_other(self): args = dict(initializer=reader.initializer, initargs=('claims.db', self.env.coin.NET)) @@ -28,8 +116,16 @@ class LBRYSessionManager(SessionManager): self.query_executor = ProcessPoolExecutor( max_workers=self.env.max_query_workers or max(os.cpu_count(), 4), **args ) + if self.can_start_websocket: + self.running = True + await self.websocket.start() + self.metric_sender = asyncio.create_task(self.send_metrics()) async def stop_other(self): + if self.can_start_websocket: + self.running = False + self.metric_sender.cancel() + await self.websocket.stop() self.query_executor.shutdown() @@ -58,14 +154,18 @@ class LBRYElectrumX(ElectrumX): async def claimtrie_search(self, **kwargs): if 'claim_id' in kwargs: self.assert_claim_id(kwargs['claim_id']) - return base64.b64encode(await asyncio.get_running_loop().run_in_executor( + data, metrics = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, reader.search_to_bytes, kwargs - )).decode() + ) + self.session_mgr.add_metric('search', metrics) + return base64.b64encode(data).decode() async def claimtrie_resolve(self, *urls): - return base64.b64encode(await asyncio.get_running_loop().run_in_executor( + data, metrics = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, reader.resolve_to_bytes, urls - )).decode() + ) + self.session_mgr.add_metric('resolve', metrics) + return base64.b64encode(data).decode() async def get_server_height(self): return self.bp.height diff --git a/lbry/tests/unit/wallet/server/test_sqldb.py b/lbry/tests/unit/wallet/server/test_sqldb.py index 87c102c43..35efb34ea 100644 --- a/lbry/tests/unit/wallet/server/test_sqldb.py +++ b/lbry/tests/unit/wallet/server/test_sqldb.py @@ -343,7 +343,7 @@ class TestClaimtrie(TestSQLDB): txo_chan_a = tx_chan_a[0].tx.outputs[0] advance(1, [tx_chan_a]) advance(2, [tx_chan_ab]) - r_ab, r_a = reader._search(order_by=['creation_height'], limit=2) + (r_ab, r_a), _ = reader._search(order_by=['creation_height'], limit=2) self.assertEqual("@foo#a", r_a['short_url']) self.assertEqual("@foo#ab", r_ab['short_url']) self.assertIsNone(r_a['canonical_url']) @@ -356,7 +356,7 @@ class TestClaimtrie(TestSQLDB): tx_abc = self.get_stream_with_claim_id_prefix('abc', 65) advance(3, [tx_a]) advance(4, [tx_ab, tx_abc]) - r_abc, r_ab, r_a = reader._search(order_by=['creation_height', 'tx_position'], limit=3) + (r_abc, r_ab, r_a), _ = reader._search(order_by=['creation_height', 'tx_position'], limit=3) self.assertEqual("foo#a", r_a['short_url']) self.assertEqual("foo#ab", r_ab['short_url']) self.assertEqual("foo#abc", r_abc['short_url']) @@ -370,39 +370,39 @@ class TestClaimtrie(TestSQLDB): ab2_claim_id = tx_ab2[0].tx.outputs[0].claim_id advance(6, [tx_a2]) advance(7, [tx_ab2]) - r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2) + (r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) # change channel public key, invaliding stream claim signatures advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')]) - r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2) + (r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertIsNone(r_a2['canonical_url']) self.assertIsNone(r_ab2['canonical_url']) - self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) # reinstate previous channel public key (previous stream claim signatures become valid again) channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c') advance(9, [channel_update]) - r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2) + (r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) # claim abandon updates claims_in_channel advance(10, [self.get_abandon(tx_ab2)]) - self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) # delete channel, invaliding stream claim signatures advance(11, [self.get_abandon(channel_update)]) - r_a2, = reader._search(order_by=['creation_height'], limit=1) + (r_a2,), _ = reader._search(order_by=['creation_height'], limit=1) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertIsNone(r_a2['canonical_url']) @@ -440,7 +440,7 @@ class TestTrending(TestSQLDB): self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN), self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN), ]) - results = reader._search(order_by=['trending_local']) + results, _ = reader._search(order_by=['trending_local']) self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results]) self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results]) self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])