From d319761483315048a0b8326de56ebabf38bfa18c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 16 Jul 2019 12:26:28 -0400 Subject: [PATCH 1/3] interrupt slow queries and log them for debugging - add QUERY_TIMEOUT_MS env variable - return sqlite timeout errors, jsonrpc error code -32000 --- lbry/lbry/wallet/server/db/reader.py | 45 ++++++++++++++++++--- lbry/lbry/wallet/server/session.py | 33 ++++++++++++--- lbry/scripts/sqlite_perf_test.py | 9 ++++- lbry/tests/unit/wallet/server/test_sqldb.py | 4 +- torba/torba/rpc/jsonrpc.py | 2 + torba/torba/server/env.py | 1 + 6 files changed, 79 insertions(+), 15 deletions(-) diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py index 97b72553e..e09ea5673 100644 --- a/lbry/lbry/wallet/server/db/reader.py +++ b/lbry/lbry/wallet/server/db/reader.py @@ -1,6 +1,7 @@ import time import struct import sqlite3 +import logging from typing import Tuple, List, Dict, Union, Type, Optional from binascii import unhexlify from decimal import Decimal @@ -18,6 +19,12 @@ from lbry.wallet.ledger import BaseLedger, MainNetLedger, RegTestLedger from .common import CLAIM_TYPES, STREAM_TYPES +class SQLiteInterruptedError(sqlite3.OperationalError): + def __init__(self, metrics): + super().__init__('sqlite query interrupted') + self.metrics = metrics + + ATTRIBUTE_ARRAY_MAX_LENGTH = 100 @@ -59,6 +66,8 @@ class ReaderState: metrics: Dict is_tracking_metrics: bool ledger: Type[BaseLedger] + query_timeout: float + log: logging.Logger def close(self): self.db.close() @@ -67,17 +76,31 @@ class ReaderState: self.stack = [] self.metrics = {} + def set_query_timeout(self): + stop_at = time.perf_counter() + self.query_timeout + + def interruptor(): + if time.perf_counter() >= stop_at: + self.db.interrupt() + return + + self.db.set_progress_handler(interruptor, 100) + ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') -def initializer(_path, _ledger_name, _measure=False): +def initializer(log, _path, _ledger_name, query_timeout, _measure=False): + db = sqlite3.connect(_path, isolation_level=None, uri=True) db.row_factory = sqlite3.Row - ctx.set(ReaderState( - db=db, stack=[], metrics={}, is_tracking_metrics=_measure, - ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger - )) + ctx.set( + ReaderState( + db=db, stack=[], metrics={}, is_tracking_metrics=_measure, + ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger, + query_timeout=query_timeout, log=log + ) + ) def cleanup(): @@ -139,7 +162,17 @@ def encode_result(result): @measure def execute_query(sql, values) -> List: - return ctx.get().db.execute(sql, values).fetchall() + context = ctx.get() + context.set_query_timeout() + try: + return context.db.execute(sql, values).fetchall() + except sqlite3.OperationalError as err: + if str(err) == "interrupted": + query_str = sql + for k in sorted(values.keys(), reverse=True): + query_str = query_str.replace(f":{k}", str(values[k]) if not k.startswith("$") else f"'{values[k]}'") + context.log.warning("interrupted slow sqlite query:\n%s", query_str) + raise SQLiteInterruptedError(context.metrics) @measure diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index ff5d3af6d..5afb99530 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -11,7 +11,7 @@ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite from aiohttp.http_websocket import WSMsgType, WSCloseCode -from torba.rpc.jsonrpc import RPCError +from torba.rpc.jsonrpc import RPCError, JSONRPC from torba.server.session import ElectrumX, SessionManager from torba.server import util @@ -115,6 +115,8 @@ class LBRYSessionManager(SessionManager): 'execution_time': 0, 'query_time': 0, 'query_count': 0, + 'interrupted': 0, + 'interrupted_query_values': [], } return self.command_metrics[command] @@ -145,6 +147,18 @@ class LBRYSessionManager(SessionManager): else: reader[key] += func_metrics[key] + def interrupted_command_error(self, command_name, elapsed, metrics, kwargs): + if self.env.track_metrics: + command = self.get_command_tracking_info(command_name) + command['finished'] += 1 + command['interrupted'] += 1 + command['total_time'] += elapsed + command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total']) + command['query_time'] += metrics['execute_query']['total'] + command['query_count'] += metrics['execute_query']['calls'] + if len(command['interrupted_query_values']) < 100: + command['interrupted_query_values'].append(kwargs) + async def process_metrics(self): while self.running: commands, self.command_metrics = self.command_metrics, {} @@ -160,7 +174,8 @@ class LBRYSessionManager(SessionManager): self.running = True args = dict( initializer=reader.initializer, - initargs=('claims.db', self.env.coin.NET, self.env.track_metrics) + initargs=(self.logger, 'claims.db', self.env.coin.NET, self.env.database_query_timeout, + self.env.track_metrics) ) if self.env.max_query_workers is not None and self.env.max_query_workers == 0: self.query_executor = ThreadPoolExecutor(max_workers=1, **args) @@ -210,9 +225,16 @@ class LBRYElectrumX(ElectrumX): if self.env.track_metrics: self.session_mgr.start_command_tracking(name) start = time.perf_counter() - result = await asyncio.get_running_loop().run_in_executor( - self.session_mgr.query_executor, func, kwargs - ) + try: + result = await asyncio.get_running_loop().run_in_executor( + self.session_mgr.query_executor, func, kwargs + ) + except reader.SQLiteInterruptedError as error: + self.session_mgr.interrupted_command_error( + name, int((time.perf_counter() - start) * 1000), error.metrics, kwargs + ) + raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') + if self.env.track_metrics: elapsed = int((time.perf_counter() - start) * 1000) (result, metrics) = result @@ -228,7 +250,6 @@ class LBRYElectrumX(ElectrumX): elif cache_item.result is not None: self.session_mgr.cache_hit(query_name) return cache_item.result - async with cache_item.lock: result = cache_item.result if result is None: diff --git a/lbry/scripts/sqlite_perf_test.py b/lbry/scripts/sqlite_perf_test.py index 85893c2bc..e3b5e61af 100644 --- a/lbry/scripts/sqlite_perf_test.py +++ b/lbry/scripts/sqlite_perf_test.py @@ -1,9 +1,12 @@ -import uvloop, asyncio, time, sys +import uvloop, asyncio, time, sys, logging from concurrent.futures import ProcessPoolExecutor from lbry.wallet.server.db import reader db_path = '/tmp/wallet-server/claims.db' +default_query_timout = 0.25 +log = logging.getLogger(__name__) +log.addHandler(logging.StreamHandler()) async def run_times(executor, iterations, show=True): @@ -40,7 +43,9 @@ async def run_times(executor, iterations, show=True): async def main(): - executor = ProcessPoolExecutor(4, initializer=reader.initializer, initargs=(db_path, 'mainnet', True)) + executor = ProcessPoolExecutor( + 4, initializer=reader.initializer, initargs=(log, db_path, 'mainnet', default_query_timout, True) + ) await run_times(executor, 4, show=False) await run_times(executor, 1) await run_times(executor, 2**3) diff --git a/lbry/tests/unit/wallet/server/test_sqldb.py b/lbry/tests/unit/wallet/server/test_sqldb.py index 87c102c43..574d13c59 100644 --- a/lbry/tests/unit/wallet/server/test_sqldb.py +++ b/lbry/tests/unit/wallet/server/test_sqldb.py @@ -1,6 +1,7 @@ import unittest import ecdsa import hashlib +import logging from binascii import hexlify from torba.client.constants import COIN, NULL_HASH32 @@ -36,6 +37,7 @@ class OldWalletServerTransaction: class TestSQLDB(unittest.TestCase): + query_timeout = 0.25 def setUp(self): self.first_sync = False @@ -44,7 +46,7 @@ class TestSQLDB(unittest.TestCase): db_url = 'file:test_sqldb?mode=memory&cache=shared' self.sql = writer.SQLDB(self, db_url) self.addCleanup(self.sql.close) - reader.initializer(db_url, 'regtest') + reader.initializer(logging.getLogger(__name__), db_url, 'regtest', self.query_timeout) self.addCleanup(reader.cleanup) self.timer = Timer('BlockProcessor') self.sql.open() diff --git a/torba/torba/rpc/jsonrpc.py b/torba/torba/rpc/jsonrpc.py index dc80b61a4..5e908cd02 100644 --- a/torba/torba/rpc/jsonrpc.py +++ b/torba/torba/rpc/jsonrpc.py @@ -165,6 +165,8 @@ class JSONRPC(object): METHOD_NOT_FOUND = -32601 INVALID_ARGS = -32602 INTERNAL_ERROR = -32603 + QUERY_TIMEOUT = -32000 + # Codes specific to this library ERROR_CODE_UNAVAILABLE = -100 diff --git a/torba/torba/server/env.py b/torba/torba/server/env.py index 2d2df3138..d2d8f83ee 100644 --- a/torba/torba/server/env.py +++ b/torba/torba/server/env.py @@ -86,6 +86,7 @@ class Env: self.identities = [identity for identity in (clearnet_identity, tor_identity) if identity is not None] + self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 250)) / 1000.0 @classmethod def default(cls, envvar, default): From 478ae7e6578137c54a6d31326773d59217a694ae Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 16 Jul 2019 12:26:50 -0400 Subject: [PATCH 2/3] add claim search performance script --- lbry/scripts/claim_search_performance.py | 117 +++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 lbry/scripts/claim_search_performance.py diff --git a/lbry/scripts/claim_search_performance.py b/lbry/scripts/claim_search_performance.py new file mode 100644 index 000000000..50e4ba19d --- /dev/null +++ b/lbry/scripts/claim_search_performance.py @@ -0,0 +1,117 @@ +import sys +import os +import time +import asyncio +import logging +from concurrent.futures.process import ProcessPoolExecutor +from lbry.wallet.server.db.reader import search_to_bytes, initializer +from lbry.wallet.ledger import MainNetLedger + +log = logging.getLogger(__name__) +log.addHandler(logging.StreamHandler()) +log.setLevel(logging.INFO) + +DEFAULT_ANY_TAGS = [ + 'blockchain', + 'news', + 'learning', + 'technology', + 'automotive', + 'economics', + 'food', + 'science', + 'art', + 'nature' +] + +MATURE_TAGS = [ + 'porn', + 'nsfw', + 'mature', + 'xxx' +] + +ORDER_BY = [ + [ + "trending_global", + "trending_mixed", + ], + [ + "release_time" + ], + [ + "effective_amount" + ] +] + + +def get_args(limit=20): + args = [] + any_tags_combinations = [DEFAULT_ANY_TAGS, []] + not_tags_combinations = [MATURE_TAGS, []] + for no_totals in [True]: + for offset in [0, 20, 40, 60, 80, 100, 1000, 2000, 3000]: + for any_tags in any_tags_combinations: + for not_tags in not_tags_combinations: + for order_by in ORDER_BY: + kw = { + 'order_by': order_by, + 'offset': offset, + 'limit': limit, + 'no_totals': no_totals + } + if not_tags: + kw['not_tags'] = not_tags + if any_tags: + kw['any_tags'] = any_tags + args.append(kw) + print(len(args), "argument combinations") + return args + + +def _search(kwargs): + start = time.time() + msg = f"offset={kwargs['offset']}, limit={kwargs['limit']}, no_totals={kwargs['no_totals']}, not_tags={kwargs.get('not_tags')}, any_tags={kwargs.get('any_tags')}, order_by={kwargs['order_by']}" + try: + search_to_bytes(kwargs) + t = time.time() - start + return t, f"{t} - {msg}" + except Exception as err: + return -1, f"failed: error={str(type(err))}({str(err)}) - {msg}" + + +async def search(executor, kwargs): + try: + return await asyncio.get_running_loop().run_in_executor( + executor, _search, kwargs + ) + except Exception as err: + return f"failed (err={str(type(err))}({err}))- {kwargs}" + + +async def main(db_path, max_query_time): + args = dict(initializer=initializer, initargs=(log, db_path, MainNetLedger, 0.25)) + workers = max(os.cpu_count(), 4) + log.info(f"using {workers} reader processes") + query_executor = ProcessPoolExecutor(workers, **args) + tasks = [search(query_executor, constraints) for constraints in get_args()] + try: + results = await asyncio.gather(*tasks) + times = {msg: ts for ts, msg in results} + log.info("\n".join(sorted(filter(lambda msg: times[msg] > max_query_time, times.keys()), key=lambda msg: times[msg]))) + finally: + query_executor.shutdown() + + +if __name__ == "__main__": + args = sys.argv[1:] + if len(args) >= 1: + db_path = args[0] + else: + db_path = os.path.expanduser('~/claims.db') + if len(args) >= 2: + max_query_time = float(args[1]) + else: + max_query_time = -3 + + asyncio.run(main(db_path, max_query_time)) From 8fbf82f4ea1d11bca2bed4669383e16ce6a8e7c2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 16 Jul 2019 17:02:40 -0400 Subject: [PATCH 3/3] filter mutually exclusive tags --- lbry/lbry/wallet/server/db/reader.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py index e09ea5673..e0c9ac450 100644 --- a/lbry/lbry/wallet/server/db/reader.py +++ b/lbry/lbry/wallet/server/db/reader.py @@ -422,7 +422,13 @@ def resolve_url(raw_url): @measure def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): - any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] + any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) + all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) + not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) + + all_items = {item for item in all_items if item not in not_items} + any_items = {item for item in any_items if item not in not_items} + if any_items: constraints.update({ f'$any_{attr}{i}': item for i, item in enumerate(any_items) @@ -443,7 +449,6 @@ def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_coun ) """ - all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] if all_items: constraints[f'$all_{attr}_count'] = len(all_items) constraints.update({ @@ -466,7 +471,6 @@ def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_coun ) """ - not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] if not_items: constraints.update({ f'$not_{attr}{i}': item for i, item in enumerate(not_items)