instrument wallet.server.db.reader with timer metrics

This commit is contained in:
Lex Berezhny 2019-07-12 01:38:03 -04:00
parent 8ca74d3de2
commit dc8b4b5252
3 changed files with 166 additions and 46 deletions

View file

@ -1,6 +1,7 @@
import sqlite3
import time
import struct
from typing import Tuple, List
import sqlite3
from typing import Tuple, List, Any
from binascii import unhexlify
from decimal import Decimal
from contextvars import ContextVar
@ -66,7 +67,7 @@ def cleanup():
ledger.set(None)
def get_claims(cols, for_count=False, **constraints):
def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]:
if 'order_by' in constraints:
sql_order_by = []
for order_by in constraints['order_by']:
@ -124,11 +125,11 @@ def get_claims(cols, for_count=False, **constraints):
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = _resolve_one(channel_url)
match, _ = _resolve_one(channel_url)
if isinstance(match, sqlite3.Row):
constraints['channel_hash'] = match['claim_hash']
else:
return [[0]] if cols == 'count(*)' else []
return [[0]] if cols == 'count(*)' else [], 0
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash'))
if 'channel_ids' in constraints:
@ -197,37 +198,46 @@ def get_claims(cols, for_count=False, **constraints):
LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash)
""", **constraints
)
return db.get().execute(sql, values).fetchall()
start = time.time()
result = db.get().execute(sql, values).fetchall()
return result, time.time()-start
def get_claims_count(**constraints):
def get_claims_count(**constraints) -> Tuple[int, float]:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = get_claims('count(*)', for_count=True, **constraints)
return count[0][0]
count, elapsed = get_claims('count(*)', for_count=True, **constraints)
return count[0][0], elapsed
def search(constraints) -> Tuple[List, List, int, int]:
def search(constraints) -> Tuple[List, List, int, int, dict]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
_metrics = {}
total = None
if not constraints.pop('no_totals', False):
total = get_claims_count(**constraints)
total, _metrics['count'] = get_claims_count(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints:
constraints['order_by'] = ["height", "^name"]
txo_rows = _search(**constraints)
txo_rows, _metrics['claim.search'] = _search(**constraints)
channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return txo_rows, extra_txo_rows, constraints['offset'], total
extra_txo_rows, _metrics['channel.search'] = _search(
**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}
)
return txo_rows, extra_txo_rows, constraints['offset'], total, _metrics
def search_to_bytes(constraints) -> bytes:
return Outputs.to_bytes(*search(constraints))
def search_to_bytes(constraints) -> (bytes, dict):
start = time.time()
*result, _metrics = search(constraints)
output = Outputs.to_bytes(*result)
_metrics['total'] = time.time()-start
return output, _metrics
def _search(**constraints):
@ -249,29 +259,39 @@ def _search(**constraints):
)
def resolve(urls) -> Tuple[List, List]:
def resolve(urls) -> Tuple[List, List, dict]:
result = []
_metrics = {'urls': []}
channel_hashes = set()
for raw_url in urls:
match = _resolve_one(raw_url)
match, metric = _resolve_one(raw_url)
result.append(match)
_metrics['urls'].append(metric)
if isinstance(match, sqlite3.Row) and match['channel_hash']:
channel_hashes.add(match['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return result, extra_txo_rows
extra_txo_rows, _metrics['channel.search'] = _search(
**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}
)
return result, extra_txo_rows, _metrics
def resolve_to_bytes(urls) -> bytes:
return Outputs.to_bytes(*resolve(urls))
def resolve_to_bytes(urls) -> Tuple[bytes, dict]:
start = time.time()
*result, _metrics = resolve(urls)
output = Outputs.to_bytes(*result)
_metrics['total'] = time.time()-start
return output, _metrics
def _resolve_one(raw_url):
def _resolve_one(raw_url) -> Tuple[Any, dict]:
_metrics = {}
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
return e, _metrics
channel = None
@ -281,11 +301,11 @@ def _resolve_one(raw_url):
query['is_controlling'] = True
else:
query['order_by'] = ['^height']
matches = _search(**query, limit=1)
matches, _metrics['channel.search'] = _search(**query, limit=1)
if matches:
channel = matches[0]
else:
return LookupError(f'Could not find channel in "{raw_url}".')
return LookupError(f'Could not find channel in "{raw_url}".'), _metrics
if url.has_stream:
query = url.stream.to_dict()
@ -299,13 +319,13 @@ def _resolve_one(raw_url):
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = _search(**query, limit=1)
matches, _metrics['stream.search'] = _search(**query, limit=1)
if matches:
return matches[0]
return matches[0], _metrics
else:
return LookupError(f'Could not find stream in "{raw_url}".')
return LookupError(f'Could not find stream in "{raw_url}".'), _metrics
return channel
return channel, _metrics
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):

View file

@ -3,6 +3,11 @@ import math
import base64
import asyncio
from binascii import hexlify
#from weakref import WeakSet
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
#from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite
#from aiohttp.http_websocket import WSMsgType, WSCloseCode
from torba.rpc.jsonrpc import RPCError
from torba.server.session import ElectrumX, SessionManager
@ -11,7 +16,60 @@ from torba.server import util
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db import reader
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
class AdminWebSocket:
def __init__(self, manager):
self.manager = manager
self.app = Application()
self.app['websockets'] = WeakSet()
self.app.router.add_get('/', self.on_connect)
self.app.on_shutdown.append(self.on_shutdown)
self.runner = AppRunner(self.app)
async def on_status(self, _):
if not self.app['websockets']:
return
self.send_message({
'type': 'status',
'height': self.manager.daemon.cached_height(),
})
def send_message(self, msg):
for web_socket in self.app['websockets']:
asyncio.create_task(web_socket.send_json(msg))
async def start(self):
await self.runner.setup()
await TCPSite(self.runner, self.manager.env.websocket_host, self.manager.env.websocket_port).start()
print('started websocket')
async def stop(self):
await self.runner.cleanup()
print('stopped websocket')
async def on_connect(self, request):
web_socket = WebSocketResponse()
await web_socket.prepare(request)
self.app['websockets'].add(web_socket)
try:
async for msg in web_socket:
if msg.type == WSMsgType.TEXT:
print(msg.data)
await self.on_status(None)
elif msg.type == WSMsgType.ERROR:
print('web socket connection closed with exception %s' %
web_socket.exception())
finally:
self.app['websockets'].discard(web_socket)
return web_socket
@staticmethod
async def on_shutdown(app):
print('disconnecting websockets')
for web_socket in app['websockets']:
await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')
class LBRYSessionManager(SessionManager):
@ -19,6 +77,36 @@ class LBRYSessionManager(SessionManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_executor = None
if self.can_start_websocket:
self.websocket = AdminWebSocket(self)
self.metric_sender = None
self.metrics = {
'search': 0,
'search_time': 0,
'resolve': 0,
'resolve_time': 0
}
@property
def can_start_websocket(self):
return self.env.websocket_host is not None and self.env.websocket_port is not None
def add_metric(self, command, metric):
if self.can_start_websocket:
self.metrics[command] += 1
self.metrics[f'{command}_time'] += int(metric['total']*1000)
async def send_metrics(self):
while self.running:
metrics = self.metrics
self.metrics = {
'search': 0,
'search_time': 0,
'resolve': 0,
'resolve_time': 0
}
self.websocket.send_message(metrics)
await asyncio.sleep(1)
async def start_other(self):
args = dict(initializer=reader.initializer, initargs=('claims.db', self.env.coin.NET))
@ -28,8 +116,16 @@ class LBRYSessionManager(SessionManager):
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4), **args
)
if self.can_start_websocket:
self.running = True
await self.websocket.start()
self.metric_sender = asyncio.create_task(self.send_metrics())
async def stop_other(self):
if self.can_start_websocket:
self.running = False
self.metric_sender.cancel()
await self.websocket.stop()
self.query_executor.shutdown()
@ -58,14 +154,18 @@ class LBRYElectrumX(ElectrumX):
async def claimtrie_search(self, **kwargs):
if 'claim_id' in kwargs:
self.assert_claim_id(kwargs['claim_id'])
return base64.b64encode(await asyncio.get_running_loop().run_in_executor(
data, metrics = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, reader.search_to_bytes, kwargs
)).decode()
)
self.session_mgr.add_metric('search', metrics)
return base64.b64encode(data).decode()
async def claimtrie_resolve(self, *urls):
return base64.b64encode(await asyncio.get_running_loop().run_in_executor(
data, metrics = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, reader.resolve_to_bytes, urls
)).decode()
)
self.session_mgr.add_metric('resolve', metrics)
return base64.b64encode(data).decode()
async def get_server_height(self):
return self.bp.height

View file

@ -343,7 +343,7 @@ class TestClaimtrie(TestSQLDB):
txo_chan_a = tx_chan_a[0].tx.outputs[0]
advance(1, [tx_chan_a])
advance(2, [tx_chan_ab])
r_ab, r_a = reader._search(order_by=['creation_height'], limit=2)
(r_ab, r_a), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual("@foo#a", r_a['short_url'])
self.assertEqual("@foo#ab", r_ab['short_url'])
self.assertIsNone(r_a['canonical_url'])
@ -356,7 +356,7 @@ class TestClaimtrie(TestSQLDB):
tx_abc = self.get_stream_with_claim_id_prefix('abc', 65)
advance(3, [tx_a])
advance(4, [tx_ab, tx_abc])
r_abc, r_ab, r_a = reader._search(order_by=['creation_height', 'tx_position'], limit=3)
(r_abc, r_ab, r_a), _ = reader._search(order_by=['creation_height', 'tx_position'], limit=3)
self.assertEqual("foo#a", r_a['short_url'])
self.assertEqual("foo#ab", r_ab['short_url'])
self.assertEqual("foo#abc", r_abc['short_url'])
@ -370,39 +370,39 @@ class TestClaimtrie(TestSQLDB):
ab2_claim_id = tx_ab2[0].tx.outputs[0].claim_id
advance(6, [tx_a2])
advance(7, [tx_ab2])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# change channel public key, invaliding stream claim signatures
advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
self.assertIsNone(r_ab2['canonical_url'])
self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# reinstate previous channel public key (previous stream claim signatures become valid again)
channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c')
advance(9, [channel_update])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# claim abandon updates claims_in_channel
advance(10, [self.get_abandon(tx_ab2)])
self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# delete channel, invaliding stream claim signatures
advance(11, [self.get_abandon(channel_update)])
r_a2, = reader._search(order_by=['creation_height'], limit=1)
(r_a2,), _ = reader._search(order_by=['creation_height'], limit=1)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
@ -440,7 +440,7 @@ class TestTrending(TestSQLDB):
self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN),
self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN),
])
results = reader._search(order_by=['trending_local'])
results, _ = reader._search(order_by=['trending_local'])
self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results])
self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results])
self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])