Merge pull request #2320 from lbryio/interrupt-slow-queries

Interrupt and log slow wallet server queries
This commit is contained in:
Jack Robison 2019-07-16 20:50:37 -04:00 committed by GitHub
commit 578f8f4062
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 203 additions and 18 deletions

View file

@ -1,6 +1,7 @@
import time import time
import struct import struct
import sqlite3 import sqlite3
import logging
from typing import Tuple, List, Dict, Union, Type, Optional from typing import Tuple, List, Dict, Union, Type, Optional
from binascii import unhexlify from binascii import unhexlify
from decimal import Decimal from decimal import Decimal
@ -18,6 +19,12 @@ from lbry.wallet.ledger import BaseLedger, MainNetLedger, RegTestLedger
from .common import CLAIM_TYPES, STREAM_TYPES from .common import CLAIM_TYPES, STREAM_TYPES
class SQLiteInterruptedError(sqlite3.OperationalError):
def __init__(self, metrics):
super().__init__('sqlite query interrupted')
self.metrics = metrics
ATTRIBUTE_ARRAY_MAX_LENGTH = 100 ATTRIBUTE_ARRAY_MAX_LENGTH = 100
@ -59,6 +66,8 @@ class ReaderState:
metrics: Dict metrics: Dict
is_tracking_metrics: bool is_tracking_metrics: bool
ledger: Type[BaseLedger] ledger: Type[BaseLedger]
query_timeout: float
log: logging.Logger
def close(self): def close(self):
self.db.close() self.db.close()
@ -67,17 +76,31 @@ class ReaderState:
self.stack = [] self.stack = []
self.metrics = {} self.metrics = {}
def set_query_timeout(self):
stop_at = time.perf_counter() + self.query_timeout
def interruptor():
if time.perf_counter() >= stop_at:
self.db.interrupt()
return
self.db.set_progress_handler(interruptor, 100)
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
def initializer(_path, _ledger_name, _measure=False): def initializer(log, _path, _ledger_name, query_timeout, _measure=False):
db = sqlite3.connect(_path, isolation_level=None, uri=True) db = sqlite3.connect(_path, isolation_level=None, uri=True)
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
ctx.set(ReaderState( ctx.set(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure, ReaderState(
ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
)) ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger,
query_timeout=query_timeout, log=log
)
)
def cleanup(): def cleanup():
@ -139,7 +162,17 @@ def encode_result(result):
@measure @measure
def execute_query(sql, values) -> List: def execute_query(sql, values) -> List:
return ctx.get().db.execute(sql, values).fetchall() context = ctx.get()
context.set_query_timeout()
try:
return context.db.execute(sql, values).fetchall()
except sqlite3.OperationalError as err:
if str(err) == "interrupted":
query_str = sql
for k in sorted(values.keys(), reverse=True):
query_str = query_str.replace(f":{k}", str(values[k]) if not k.startswith("$") else f"'{values[k]}'")
context.log.warning("interrupted slow sqlite query:\n%s", query_str)
raise SQLiteInterruptedError(context.metrics)
@measure @measure
@ -389,7 +422,13 @@ def resolve_url(raw_url):
@measure @measure
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = {item for item in all_items if item not in not_items}
any_items = {item for item in any_items if item not in not_items}
if any_items: if any_items:
constraints.update({ constraints.update({
f'$any_{attr}{i}': item for i, item in enumerate(any_items) f'$any_{attr}{i}': item for i, item in enumerate(any_items)
@ -410,7 +449,6 @@ def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_coun
) )
""" """
all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if all_items: if all_items:
constraints[f'$all_{attr}_count'] = len(all_items) constraints[f'$all_{attr}_count'] = len(all_items)
constraints.update({ constraints.update({
@ -433,7 +471,6 @@ def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_coun
) )
""" """
not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if not_items: if not_items:
constraints.update({ constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items) f'$not_{attr}{i}': item for i, item in enumerate(not_items)

View file

@ -11,7 +11,7 @@ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite
from aiohttp.http_websocket import WSMsgType, WSCloseCode from aiohttp.http_websocket import WSMsgType, WSCloseCode
from torba.rpc.jsonrpc import RPCError from torba.rpc.jsonrpc import RPCError, JSONRPC
from torba.server.session import ElectrumX, SessionManager from torba.server.session import ElectrumX, SessionManager
from torba.server import util from torba.server import util
@ -115,6 +115,8 @@ class LBRYSessionManager(SessionManager):
'execution_time': 0, 'execution_time': 0,
'query_time': 0, 'query_time': 0,
'query_count': 0, 'query_count': 0,
'interrupted': 0,
'interrupted_query_values': [],
} }
return self.command_metrics[command] return self.command_metrics[command]
@ -145,6 +147,18 @@ class LBRYSessionManager(SessionManager):
else: else:
reader[key] += func_metrics[key] reader[key] += func_metrics[key]
def interrupted_command_error(self, command_name, elapsed, metrics, kwargs):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['finished'] += 1
command['interrupted'] += 1
command['total_time'] += elapsed
command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total'])
command['query_time'] += metrics['execute_query']['total']
command['query_count'] += metrics['execute_query']['calls']
if len(command['interrupted_query_values']) < 100:
command['interrupted_query_values'].append(kwargs)
async def process_metrics(self): async def process_metrics(self):
while self.running: while self.running:
commands, self.command_metrics = self.command_metrics, {} commands, self.command_metrics = self.command_metrics, {}
@ -160,7 +174,8 @@ class LBRYSessionManager(SessionManager):
self.running = True self.running = True
args = dict( args = dict(
initializer=reader.initializer, initializer=reader.initializer,
initargs=('claims.db', self.env.coin.NET, self.env.track_metrics) initargs=(self.logger, 'claims.db', self.env.coin.NET, self.env.database_query_timeout,
self.env.track_metrics)
) )
if self.env.max_query_workers is not None and self.env.max_query_workers == 0: if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1, **args) self.query_executor = ThreadPoolExecutor(max_workers=1, **args)
@ -210,9 +225,16 @@ class LBRYElectrumX(ElectrumX):
if self.env.track_metrics: if self.env.track_metrics:
self.session_mgr.start_command_tracking(name) self.session_mgr.start_command_tracking(name)
start = time.perf_counter() start = time.perf_counter()
result = await asyncio.get_running_loop().run_in_executor( try:
self.session_mgr.query_executor, func, kwargs result = await asyncio.get_running_loop().run_in_executor(
) self.session_mgr.query_executor, func, kwargs
)
except reader.SQLiteInterruptedError as error:
self.session_mgr.interrupted_command_error(
name, int((time.perf_counter() - start) * 1000), error.metrics, kwargs
)
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
if self.env.track_metrics: if self.env.track_metrics:
elapsed = int((time.perf_counter() - start) * 1000) elapsed = int((time.perf_counter() - start) * 1000)
(result, metrics) = result (result, metrics) = result
@ -228,7 +250,6 @@ class LBRYElectrumX(ElectrumX):
elif cache_item.result is not None: elif cache_item.result is not None:
self.session_mgr.cache_hit(query_name) self.session_mgr.cache_hit(query_name)
return cache_item.result return cache_item.result
async with cache_item.lock: async with cache_item.lock:
result = cache_item.result result = cache_item.result
if result is None: if result is None:

View file

@ -0,0 +1,117 @@
import sys
import os
import time
import asyncio
import logging
from concurrent.futures.process import ProcessPoolExecutor
from lbry.wallet.server.db.reader import search_to_bytes, initializer
from lbry.wallet.ledger import MainNetLedger
log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
DEFAULT_ANY_TAGS = [
'blockchain',
'news',
'learning',
'technology',
'automotive',
'economics',
'food',
'science',
'art',
'nature'
]
MATURE_TAGS = [
'porn',
'nsfw',
'mature',
'xxx'
]
ORDER_BY = [
[
"trending_global",
"trending_mixed",
],
[
"release_time"
],
[
"effective_amount"
]
]
def get_args(limit=20):
args = []
any_tags_combinations = [DEFAULT_ANY_TAGS, []]
not_tags_combinations = [MATURE_TAGS, []]
for no_totals in [True]:
for offset in [0, 20, 40, 60, 80, 100, 1000, 2000, 3000]:
for any_tags in any_tags_combinations:
for not_tags in not_tags_combinations:
for order_by in ORDER_BY:
kw = {
'order_by': order_by,
'offset': offset,
'limit': limit,
'no_totals': no_totals
}
if not_tags:
kw['not_tags'] = not_tags
if any_tags:
kw['any_tags'] = any_tags
args.append(kw)
print(len(args), "argument combinations")
return args
def _search(kwargs):
start = time.time()
msg = f"offset={kwargs['offset']}, limit={kwargs['limit']}, no_totals={kwargs['no_totals']}, not_tags={kwargs.get('not_tags')}, any_tags={kwargs.get('any_tags')}, order_by={kwargs['order_by']}"
try:
search_to_bytes(kwargs)
t = time.time() - start
return t, f"{t} - {msg}"
except Exception as err:
return -1, f"failed: error={str(type(err))}({str(err)}) - {msg}"
async def search(executor, kwargs):
try:
return await asyncio.get_running_loop().run_in_executor(
executor, _search, kwargs
)
except Exception as err:
return f"failed (err={str(type(err))}({err}))- {kwargs}"
async def main(db_path, max_query_time):
args = dict(initializer=initializer, initargs=(log, db_path, MainNetLedger, 0.25))
workers = max(os.cpu_count(), 4)
log.info(f"using {workers} reader processes")
query_executor = ProcessPoolExecutor(workers, **args)
tasks = [search(query_executor, constraints) for constraints in get_args()]
try:
results = await asyncio.gather(*tasks)
times = {msg: ts for ts, msg in results}
log.info("\n".join(sorted(filter(lambda msg: times[msg] > max_query_time, times.keys()), key=lambda msg: times[msg])))
finally:
query_executor.shutdown()
if __name__ == "__main__":
args = sys.argv[1:]
if len(args) >= 1:
db_path = args[0]
else:
db_path = os.path.expanduser('~/claims.db')
if len(args) >= 2:
max_query_time = float(args[1])
else:
max_query_time = -3
asyncio.run(main(db_path, max_query_time))

View file

@ -1,9 +1,12 @@
import uvloop, asyncio, time, sys import uvloop, asyncio, time, sys, logging
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from lbry.wallet.server.db import reader from lbry.wallet.server.db import reader
db_path = '/tmp/wallet-server/claims.db' db_path = '/tmp/wallet-server/claims.db'
default_query_timout = 0.25
log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
async def run_times(executor, iterations, show=True): async def run_times(executor, iterations, show=True):
@ -40,7 +43,9 @@ async def run_times(executor, iterations, show=True):
async def main(): async def main():
executor = ProcessPoolExecutor(4, initializer=reader.initializer, initargs=(db_path, 'mainnet', True)) executor = ProcessPoolExecutor(
4, initializer=reader.initializer, initargs=(log, db_path, 'mainnet', default_query_timout, True)
)
await run_times(executor, 4, show=False) await run_times(executor, 4, show=False)
await run_times(executor, 1) await run_times(executor, 1)
await run_times(executor, 2**3) await run_times(executor, 2**3)

View file

@ -1,6 +1,7 @@
import unittest import unittest
import ecdsa import ecdsa
import hashlib import hashlib
import logging
from binascii import hexlify from binascii import hexlify
from torba.client.constants import COIN, NULL_HASH32 from torba.client.constants import COIN, NULL_HASH32
@ -36,6 +37,7 @@ class OldWalletServerTransaction:
class TestSQLDB(unittest.TestCase): class TestSQLDB(unittest.TestCase):
query_timeout = 0.25
def setUp(self): def setUp(self):
self.first_sync = False self.first_sync = False
@ -44,7 +46,7 @@ class TestSQLDB(unittest.TestCase):
db_url = 'file:test_sqldb?mode=memory&cache=shared' db_url = 'file:test_sqldb?mode=memory&cache=shared'
self.sql = writer.SQLDB(self, db_url) self.sql = writer.SQLDB(self, db_url)
self.addCleanup(self.sql.close) self.addCleanup(self.sql.close)
reader.initializer(db_url, 'regtest') reader.initializer(logging.getLogger(__name__), db_url, 'regtest', self.query_timeout)
self.addCleanup(reader.cleanup) self.addCleanup(reader.cleanup)
self.timer = Timer('BlockProcessor') self.timer = Timer('BlockProcessor')
self.sql.open() self.sql.open()

View file

@ -165,6 +165,8 @@ class JSONRPC(object):
METHOD_NOT_FOUND = -32601 METHOD_NOT_FOUND = -32601
INVALID_ARGS = -32602 INVALID_ARGS = -32602
INTERNAL_ERROR = -32603 INTERNAL_ERROR = -32603
QUERY_TIMEOUT = -32000
# Codes specific to this library # Codes specific to this library
ERROR_CODE_UNAVAILABLE = -100 ERROR_CODE_UNAVAILABLE = -100

View file

@ -86,6 +86,7 @@ class Env:
self.identities = [identity self.identities = [identity
for identity in (clearnet_identity, tor_identity) for identity in (clearnet_identity, tor_identity)
if identity is not None] if identity is not None]
self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 250)) / 1000.0
@classmethod @classmethod
def default(cls, envvar, default): def default(cls, envvar, default):