refactored websockets and metrics

This commit is contained in:
Lex Berezhny 2019-07-17 21:50:20 -04:00
parent 49b0f59790
commit 046459a72d
4 changed files with 100 additions and 144 deletions

View file

@ -9,7 +9,7 @@ from contextvars import ContextVar
from functools import wraps from functools import wraps
from dataclasses import dataclass from dataclasses import dataclass
from torba.client.basedatabase import query from torba.client.basedatabase import query, interpolate
from lbry.schema.url import URL, normalize_name from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags from lbry.schema.tags import clean_tags
@ -19,6 +19,12 @@ from lbry.wallet.ledger import BaseLedger, MainNetLedger, RegTestLedger
from .common import CLAIM_TYPES, STREAM_TYPES from .common import CLAIM_TYPES, STREAM_TYPES
class SQLiteOperationalError(sqlite3.OperationalError):
def __init__(self, metrics):
super().__init__('sqlite query errored')
self.metrics = metrics
class SQLiteInterruptedError(sqlite3.OperationalError): class SQLiteInterruptedError(sqlite3.OperationalError):
def __init__(self, metrics): def __init__(self, metrics):
super().__init__('sqlite query interrupted') super().__init__('sqlite query interrupted')
@ -91,7 +97,6 @@ ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
def initializer(log, _path, _ledger_name, query_timeout, _measure=False): def initializer(log, _path, _ledger_name, query_timeout, _measure=False):
db = sqlite3.connect(_path, isolation_level=None, uri=True) db = sqlite3.connect(_path, isolation_level=None, uri=True)
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
ctx.set( ctx.set(
@ -114,21 +119,18 @@ def measure(func):
state = ctx.get() state = ctx.get()
if not state.is_tracking_metrics: if not state.is_tracking_metrics:
return func(*args, **kwargs) return func(*args, **kwargs)
metric = state.metrics.setdefault(func.__name__, { metric = {}
'calls': 0, 'total': 0, 'isolated': 0, 'errors': 0 state.metrics.setdefault(func.__name__, []).append(metric)
})
state.stack.append([]) state.stack.append([])
start = time.perf_counter() start = time.perf_counter()
try: try:
return func(*args, **kwargs) return func(*args, **kwargs)
except: except:
metric['errors'] += 1
raise raise
finally: finally:
elapsed = int((time.perf_counter()-start)*1000) elapsed = int((time.perf_counter()-start)*1000)
metric['calls'] += 1 metric['total'] = elapsed
metric['total'] += elapsed metric['isolated'] = (elapsed-sum(state.stack.pop()))
metric['isolated'] += (elapsed-sum(state.stack.pop()))
if state.stack: if state.stack:
state.stack[-1].append(elapsed) state.stack[-1].append(elapsed)
return wrapper return wrapper
@ -167,15 +169,16 @@ def execute_query(sql, values) -> List:
try: try:
return context.db.execute(sql, values).fetchall() return context.db.execute(sql, values).fetchall()
except sqlite3.OperationalError as err: except sqlite3.OperationalError as err:
plain_sql = interpolate(sql, values)
if not context.is_tracking_metrics:
context.metric['execute_query'][-1]['sql'] = plain_sql
if str(err) == "interrupted": if str(err) == "interrupted":
query_str = sql context.log.warning("interrupted slow sqlite query:\n%s", plain_sql)
for k in sorted(values.keys(), reverse=True):
query_str = query_str.replace(f":{k}", str(values[k]) if not k.startswith("$") else f"'{values[k]}'")
context.log.warning("interrupted slow sqlite query:\n%s", query_str)
raise SQLiteInterruptedError(context.metrics) raise SQLiteInterruptedError(context.metrics)
context.log.exception('failed running query', exc_info=err)
raise SQLiteOperationalError(context.metrics)
@measure
def get_claims(cols, for_count=False, **constraints) -> List: def get_claims(cols, for_count=False, **constraints) -> List:
if 'order_by' in constraints: if 'order_by' in constraints:
sql_order_by = [] sql_order_by = []
@ -420,7 +423,6 @@ def resolve_url(raw_url):
return channel return channel
@measure
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])

View file

@ -257,7 +257,7 @@ class SQLDB:
if tags: if tags:
self.db.executemany( self.db.executemany(
"INSERT INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values() "INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values()
) )
return claims return claims

View file

@ -4,13 +4,9 @@ import time
import base64 import base64
import asyncio import asyncio
from binascii import hexlify from binascii import hexlify
from weakref import WeakSet
from pylru import lrucache from pylru import lrucache
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite
from aiohttp.http_websocket import WSMsgType, WSCloseCode
from torba.rpc.jsonrpc import RPCError, JSONRPC from torba.rpc.jsonrpc import RPCError, JSONRPC
from torba.server.session import ElectrumX, SessionManager from torba.server.session import ElectrumX, SessionManager
from torba.server import util from torba.server import util
@ -18,56 +14,8 @@ from torba.server import util
from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYDB from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db import reader from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData
class AdminWebSocket:
def __init__(self, manager):
self.manager = manager
self.app = Application()
self.app['websockets'] = WeakSet()
self.app.router.add_get('/', self.on_connect)
self.app.on_shutdown.append(self.on_shutdown)
self.runner = AppRunner(self.app)
async def on_status(self, _):
if not self.app['websockets']:
return
self.send_message({
'type': 'status',
'height': self.manager.daemon.cached_height(),
})
def send_message(self, msg):
for web_socket in self.app['websockets']:
asyncio.create_task(web_socket.send_json(msg))
async def start(self):
await self.runner.setup()
await TCPSite(self.runner, self.manager.env.websocket_host, self.manager.env.websocket_port).start()
async def stop(self):
await self.runner.cleanup()
async def on_connect(self, request):
web_socket = WebSocketResponse()
await web_socket.prepare(request)
self.app['websockets'].add(web_socket)
try:
async for msg in web_socket:
if msg.type == WSMsgType.TEXT:
await self.on_status(None)
elif msg.type == WSMsgType.ERROR:
print('web socket connection closed with exception %s' %
web_socket.exception())
finally:
self.app['websockets'].discard(web_socket)
return web_socket
@staticmethod
async def on_shutdown(app):
for web_socket in set(app['websockets']):
await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')
class ResultCacheItem: class ResultCacheItem:
@ -95,9 +43,8 @@ class LBRYSessionManager(SessionManager):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.query_executor = None self.query_executor = None
self.websocket = None self.websocket = None
self.metrics_processor = None self.metrics = ServerLoadData()
self.command_metrics = {} self.metrics_loop = None
self.reader_metrics = {}
self.running = False self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None: if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self) self.websocket = AdminWebSocket(self)
@ -105,69 +52,11 @@ class LBRYSessionManager(SessionManager):
self.search_cache['search'] = lrucache(10000) self.search_cache['search'] = lrucache(10000)
self.search_cache['resolve'] = lrucache(10000) self.search_cache['resolve'] = lrucache(10000)
def get_command_tracking_info(self, command):
if command not in self.command_metrics:
self.command_metrics[command] = {
'cache_hit': 0,
'started': 0,
'finished': 0,
'total_time': 0,
'execution_time': 0,
'query_time': 0,
'query_count': 0,
'interrupted': 0,
'interrupted_query_values': [],
}
return self.command_metrics[command]
def cache_hit(self, command_name):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['cache_hit'] += 1
def start_command_tracking(self, command_name):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['started'] += 1
def finish_command_tracking(self, command_name, elapsed, metrics):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['finished'] += 1
command['total_time'] += elapsed
if 'execute_query' in metrics:
command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total'])
command['query_time'] += metrics['execute_query']['total']
command['query_count'] += metrics['execute_query']['calls']
for func_name, func_metrics in metrics.items():
reader = self.reader_metrics.setdefault(func_name, {})
for key in func_metrics:
if key not in reader:
reader[key] = func_metrics[key]
else:
reader[key] += func_metrics[key]
def interrupted_command_error(self, command_name, elapsed, metrics, kwargs):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['finished'] += 1
command['interrupted'] += 1
command['total_time'] += elapsed
command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total'])
command['query_time'] += metrics['execute_query']['total']
command['query_count'] += metrics['execute_query']['calls']
if len(command['interrupted_query_values']) < 100:
command['interrupted_query_values'].append(kwargs)
async def process_metrics(self): async def process_metrics(self):
while self.running: while self.running:
commands, self.command_metrics = self.command_metrics, {} data = self.metrics.to_json_and_reset({'sessions': self.session_count()})
reader, self.reader_metrics = self.reader_metrics, {}
if self.websocket is not None: if self.websocket is not None:
self.websocket.send_message({ self.websocket.send_message(data)
'commands': commands,
'reader': reader
})
await asyncio.sleep(1) await asyncio.sleep(1)
async def start_other(self): async def start_other(self):
@ -186,12 +75,12 @@ class LBRYSessionManager(SessionManager):
if self.websocket is not None: if self.websocket is not None:
await self.websocket.start() await self.websocket.start()
if self.env.track_metrics: if self.env.track_metrics:
self.metrics_processor = asyncio.create_task(self.process_metrics()) self.metrics_loop = asyncio.create_task(self.process_metrics())
async def stop_other(self): async def stop_other(self):
self.running = False self.running = False
if self.env.track_metrics: if self.env.track_metrics:
self.metrics_processor.cancel() self.metrics_loop.cancel()
if self.websocket is not None: if self.websocket is not None:
await self.websocket.stop() await self.websocket.stop()
self.query_executor.shutdown() self.query_executor.shutdown()
@ -221,24 +110,34 @@ class LBRYElectrumX(ElectrumX):
self.request_handlers.update(handlers) self.request_handlers.update(handlers)
async def run_in_executor(self, name, func, kwargs): async def run_in_executor(self, name, func, kwargs):
start = None result = start = api = None
if self.env.track_metrics: if self.env.track_metrics:
self.session_mgr.start_command_tracking(name) api = self.session_mgr.metrics.for_api(name)
api.start()
start = time.perf_counter() start = time.perf_counter()
try: try:
result = await asyncio.get_running_loop().run_in_executor( result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs self.session_mgr.query_executor, func, kwargs
) )
except reader.SQLiteInterruptedError as error: except reader.SQLiteInterruptedError as error:
self.session_mgr.interrupted_command_error( if self.env.track_metrics:
name, int((time.perf_counter() - start) * 1000), error.metrics, kwargs api.interrupt(start, error.metrics)
)
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
except reader.SQLiteOperationalError as error:
if self.env.track_metrics:
api.error(start, error.metrics)
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
except:
if self.env.track_metrics:
api.error(start)
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
if self.env.track_metrics: if self.env.track_metrics:
elapsed = int((time.perf_counter() - start) * 1000)
(result, metrics) = result (result, metrics) = result
self.session_mgr.finish_command_tracking(name, elapsed, metrics) api.finish(start, metrics)
return base64.b64encode(result).decode() return base64.b64encode(result).decode()
async def run_and_cache_query(self, query_name, function, kwargs): async def run_and_cache_query(self, query_name, function, kwargs):
@ -248,7 +147,7 @@ class LBRYElectrumX(ElectrumX):
if cache_item is None: if cache_item is None:
cache_item = cache[cache_key] = ResultCacheItem() cache_item = cache[cache_key] = ResultCacheItem()
elif cache_item.result is not None: elif cache_item.result is not None:
self.session_mgr.cache_hit(query_name) self.session_mgr.metrics.for_api(query_name).cache_hit()
return cache_item.result return cache_item.result
async with cache_item.lock: async with cache_item.lock:
result = cache_item.result result = cache_item.result
@ -257,7 +156,7 @@ class LBRYElectrumX(ElectrumX):
query_name, function, kwargs query_name, function, kwargs
) )
else: else:
self.session_mgr.cache_hit(query_name) self.session_mgr.metrics.for_api(query_name).cache_hit()
return result return result
async def claimtrie_search(self, **kwargs): async def claimtrie_search(self, **kwargs):

View file

@ -0,0 +1,55 @@
import asyncio
from weakref import WeakSet
from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite
from aiohttp.http_websocket import WSMsgType, WSCloseCode
class AdminWebSocket:
def __init__(self, manager):
self.manager = manager
self.app = Application()
self.app['websockets'] = WeakSet()
self.app.router.add_get('/', self.on_connect)
self.app.on_shutdown.append(self.on_shutdown)
self.runner = AppRunner(self.app)
async def on_status(self, _):
if not self.app['websockets']:
return
self.send_message({
'type': 'status',
'height': self.manager.daemon.cached_height(),
})
def send_message(self, msg):
for web_socket in self.app['websockets']:
asyncio.create_task(web_socket.send_json(msg))
async def start(self):
await self.runner.setup()
await TCPSite(self.runner, self.manager.env.websocket_host, self.manager.env.websocket_port).start()
async def stop(self):
await self.runner.cleanup()
async def on_connect(self, request):
web_socket = WebSocketResponse()
await web_socket.prepare(request)
self.app['websockets'].add(web_socket)
try:
async for msg in web_socket:
if msg.type == WSMsgType.TEXT:
await self.on_status(None)
elif msg.type == WSMsgType.ERROR:
print('web socket connection closed with exception %s' %
web_socket.exception())
finally:
self.app['websockets'].discard(web_socket)
return web_socket
@staticmethod
async def on_shutdown(app):
for web_socket in set(app['websockets']):
await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')