refactored metrics gathering for wallet server

This commit is contained in:
Lex Berezhny 2019-07-13 00:34:40 -04:00
parent 7d2ad5e4ed
commit db3f9af174
2 changed files with 195 additions and 99 deletions

View file

@ -1,16 +1,19 @@
import time
import struct
import sqlite3
from typing import Tuple, List, Any
from typing import Tuple, List, Dict, Union, Type, Optional
from binascii import unhexlify
from decimal import Decimal
from contextvars import ContextVar
from functools import wraps
from dataclasses import dataclass
from torba.client.basedatabase import query
from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags
from lbry.schema.result import Outputs
from lbry.wallet.ledger import BaseLedger, MainNetLedger, RegTestLedger
from .common import CLAIM_TYPES, STREAM_TYPES
@ -49,25 +52,96 @@ PRAGMAS = """
"""
db = ContextVar('db')
ledger = ContextVar('ledger')
@dataclass
class ReaderState:
db: sqlite3.Connection
stack: List[List]
metrics: Dict
is_tracking_metrics: bool
ledger: Type[BaseLedger]
def close(self):
self.db.close()
def reset_metrics(self):
self.stack = []
self.metrics = {}
def initializer(_path, _ledger_name):
_db = sqlite3.connect(_path, isolation_level=None, uri=True)
_db.row_factory = sqlite3.Row
db.set(_db)
from lbry.wallet.ledger import MainNetLedger, RegTestLedger
ledger.set(MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger)
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
def initializer(_path, _ledger_name, _measure=False):
db = sqlite3.connect(_path, isolation_level=None, uri=True)
db.row_factory = sqlite3.Row
ctx.set(ReaderState(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger
))
def cleanup():
db.get().close()
db.set(None)
ledger.set(None)
ctx.get().close()
ctx.set(None)
def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]:
def measure(func):
@wraps(func)
def wrapper(*args, **kwargs):
state = ctx.get()
if not state.is_tracking_metrics:
return func(*args, **kwargs)
metric = state.metrics.setdefault(func.__name__, {'calls': 0, 'total': 0, 'isolated': 0})
state.stack.append([])
start = time.perf_counter()
try:
return func(*args, **kwargs)
except:
metric['errors'] += 1
raise
finally:
elapsed = int((time.perf_counter()-start)*1000)
metric['calls'] += 1
metric['total'] += elapsed
metric['isolated'] += (elapsed-sum(state.stack.pop()))
if state.stack:
state.stack[-1].append(elapsed)
return wrapper
def reports_metrics(func):
@wraps(func)
def wrapper(*args, **kwargs):
state = ctx.get()
if not state.is_tracking_metrics:
return func(*args, **kwargs)
state.reset_metrics()
r = func(*args, **kwargs)
return r, state.metrics
return wrapper
@reports_metrics
def search_to_bytes(constraints) -> Union[bytes, Tuple[bytes, Dict]]:
return encode_result(search(constraints))
@reports_metrics
def resolve_to_bytes(urls) -> Union[bytes, Tuple[bytes, Dict]]:
return encode_result(resolve(urls))
def encode_result(result):
return Outputs.to_bytes(*result)
@measure
def execute_query(sql, values) -> List:
return ctx.get().db.execute(sql, values).fetchall()
@measure
def get_claims(cols, for_count=False, **constraints) -> List:
if 'order_by' in constraints:
sql_order_by = []
for order_by in constraints['order_by']:
@ -121,15 +195,15 @@ def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]:
if 'public_key_id' in constraints:
constraints['claim.public_key_hash'] = sqlite3.Binary(
ledger.get().address_to_hash160(constraints.pop('public_key_id')))
ctx.get().ledger.address_to_hash160(constraints.pop('public_key_id')))
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match, _ = _resolve_one(channel_url)
match = resolve_url(channel_url)
if isinstance(match, sqlite3.Row):
constraints['channel_hash'] = match['claim_hash']
else:
return [[0]] if cols == 'count(*)' else [], 0
return [[0]] if cols == 'count(*)' else []
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash'))
if 'channel_ids' in constraints:
@ -198,46 +272,38 @@ def get_claims(cols, for_count=False, **constraints) -> Tuple[List, float]:
LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash)
""", **constraints
)
start = time.time()
result = db.get().execute(sql, values).fetchall()
return result, time.time()-start
return execute_query(sql, values)
def get_claims_count(**constraints) -> Tuple[int, float]:
@measure
def get_claims_count(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count, elapsed = get_claims('count(*)', for_count=True, **constraints)
return count[0][0], elapsed
count = get_claims('count(*)', for_count=True, **constraints)
return count[0][0]
def search(constraints) -> Tuple[List, List, int, int, dict]:
@measure
def search(constraints) -> Tuple[List, List, int, int]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
_metrics = {}
total = None
if not constraints.pop('no_totals', False):
total, _metrics['count'] = get_claims_count(**constraints)
total = get_claims_count(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints:
constraints['order_by'] = ["height", "^name"]
txo_rows, _metrics['claim.search'] = _search(**constraints)
txo_rows = _search(**constraints)
channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows, _metrics['channel.search'] = _search(
extra_txo_rows = _search(
**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}
)
return txo_rows, extra_txo_rows, constraints['offset'], total, _metrics
def search_to_bytes(constraints) -> (bytes, dict):
start = time.time()
*result, _metrics = search(constraints)
output = Outputs.to_bytes(*result)
_metrics['total'] = time.time()-start
return output, _metrics
return txo_rows, extra_txo_rows, constraints['offset'], total
def _search(**constraints):
@ -259,39 +325,30 @@ def _search(**constraints):
)
def resolve(urls) -> Tuple[List, List, dict]:
@measure
def resolve(urls) -> Tuple[List, List]:
result = []
_metrics = {'urls': []}
channel_hashes = set()
for raw_url in urls:
match, metric = _resolve_one(raw_url)
match = resolve_url(raw_url)
result.append(match)
_metrics['urls'].append(metric)
if isinstance(match, sqlite3.Row) and match['channel_hash']:
channel_hashes.add(match['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows, _metrics['channel.search'] = _search(
extra_txo_rows = _search(
**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}
)
return result, extra_txo_rows, _metrics
return result, extra_txo_rows
def resolve_to_bytes(urls) -> Tuple[bytes, dict]:
start = time.time()
*result, _metrics = resolve(urls)
output = Outputs.to_bytes(*result) # pylint: disable=E1120
_metrics['total'] = time.time()-start
return output, _metrics
def _resolve_one(raw_url) -> Tuple[Any, dict]:
_metrics = {}
@measure
def resolve_url(raw_url):
try:
url = URL.parse(raw_url)
except ValueError as e:
return e, _metrics
return e
channel = None
@ -301,11 +358,11 @@ def _resolve_one(raw_url) -> Tuple[Any, dict]:
query['is_controlling'] = True
else:
query['order_by'] = ['^height']
matches, _metrics['channel.search'] = _search(**query, limit=1)
matches = _search(**query, limit=1)
if matches:
channel = matches[0]
else:
return LookupError(f'Could not find channel in "{raw_url}".'), _metrics
return LookupError(f'Could not find channel in "{raw_url}".')
if url.has_stream:
query = url.stream.to_dict()
@ -319,15 +376,16 @@ def _resolve_one(raw_url) -> Tuple[Any, dict]:
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches, _metrics['stream.search'] = _search(**query, limit=1)
matches = _search(**query, limit=1)
if matches:
return matches[0], _metrics
return matches[0]
else:
return LookupError(f'Could not find stream in "{raw_url}".'), _metrics
return LookupError(f'Could not find stream in "{raw_url}".')
return channel, _metrics
return channel
@measure
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if any_items:

View file

@ -1,5 +1,6 @@
import os
import math
import time
import base64
import asyncio
from binascii import hexlify
@ -77,54 +78,80 @@ class LBRYSessionManager(SessionManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_executor = None
if self.can_start_websocket:
self.websocket = None
self.metrics_processor = None
self.command_metrics = {}
self.reader_metrics = {}
self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
self.metric_sender = None
self.metrics = {
'search': 0,
'search_time': 0,
'resolve': 0,
'resolve_time': 0
def get_command_tracking_info(self, command):
if command not in self.command_metrics:
self.command_metrics[command] = {
'started': 0,
'finished': 0,
'total_time': 0,
'execution_time': 0,
'query_time': 0,
'query_count': 0,
}
return self.command_metrics[command]
@property
def can_start_websocket(self):
return self.env.websocket_host is not None and self.env.websocket_port is not None
def start_command_tracking(self, command_name):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['started'] += 1
def add_metric(self, command, metric):
if self.can_start_websocket:
self.metrics[command] += 1
self.metrics[f'{command}_time'] += int(metric['total']*1000)
def finish_command_tracking(self, command_name, elapsed, metrics):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['finished'] += 1
command['total_time'] += elapsed
command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total'])
command['query_time'] += metrics['execute_query']['total']
command['query_count'] += metrics['execute_query']['calls']
for func_name, func_metrics in metrics.items():
reader = self.reader_metrics.setdefault(func_name, {})
for key in func_metrics:
if key not in reader:
reader[key] = func_metrics[key]
else:
reader[key] += func_metrics[key]
async def send_metrics(self):
async def process_metrics(self):
while self.running:
metrics = self.metrics
self.metrics = {
'search': 0,
'search_time': 0,
'resolve': 0,
'resolve_time': 0
}
self.websocket.send_message(metrics)
commands, self.command_metrics = self.command_metrics, {}
reader, self.reader_metrics = self.reader_metrics, {}
if self.websocket is not None:
self.websocket.send_message({
'commands': commands,
'reader': reader
})
await asyncio.sleep(1)
async def start_other(self):
args = dict(initializer=reader.initializer, initargs=('claims.db', self.env.coin.NET))
self.running = True
args = dict(
initializer=reader.initializer,
initargs=('claims.db', self.env.coin.NET, self.env.track_metrics)
)
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1, **args)
else:
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4), **args
)
if self.can_start_websocket:
self.running = True
if self.websocket is not None:
await self.websocket.start()
self.metric_sender = asyncio.create_task(self.send_metrics())
if self.env.track_metrics:
self.metrics_processor = asyncio.create_task(self.process_metrics())
async def stop_other(self):
if self.can_start_websocket:
self.running = False
self.metric_sender.cancel()
self.running = False
if self.env.track_metrics:
self.metrics_processor.cancel()
if self.websocket is not None:
await self.websocket.stop()
self.query_executor.shutdown()
@ -132,6 +159,7 @@ class LBRYSessionManager(SessionManager):
class LBRYElectrumX(ElectrumX):
PROTOCOL_MIN = (0, 0) # temporary, for supporting 0.10 protocol
max_errors = math.inf # don't disconnect people for errors! let them happen...
session_mgr: LBRYSessionManager
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -151,21 +179,31 @@ class LBRYElectrumX(ElectrumX):
}
self.request_handlers.update(handlers)
async def run_in_executor(self, name, func, kwargs):
start = None
if self.env.track_metrics:
self.session_mgr.start_command_tracking(name)
start = time.perf_counter()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
if self.env.track_metrics:
elapsed = int((time.perf_counter() - start) * 1000)
(result, metrics) = result
self.session_mgr.finish_command_tracking(name, elapsed, metrics)
return result
async def claimtrie_search(self, **kwargs):
if 'claim_id' in kwargs:
self.assert_claim_id(kwargs['claim_id'])
data, metrics = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, reader.search_to_bytes, kwargs
)
self.session_mgr.add_metric('search', metrics)
return base64.b64encode(data).decode()
return base64.b64encode(
await self.run_in_executor('search', reader.search_to_bytes, kwargs)
).decode()
async def claimtrie_resolve(self, *urls):
data, metrics = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, reader.resolve_to_bytes, urls
)
self.session_mgr.add_metric('resolve', metrics)
return base64.b64encode(data).decode()
return base64.b64encode(
await self.run_in_executor('resolve', reader.resolve_to_bytes, urls)
).decode()
async def get_server_height(self):
return self.bp.height