interrupt slow queries and log them for debugging

- add QUERY_TIMEOUT_MS env variable

- return sqlite timeout errors, jsonrpc error code -32000
This commit is contained in:
Jack Robison 2019-07-16 12:26:28 -04:00
parent 16fb716872
commit d319761483
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 79 additions and 15 deletions

View file

@ -1,6 +1,7 @@
import time
import struct
import sqlite3
import logging
from typing import Tuple, List, Dict, Union, Type, Optional
from binascii import unhexlify
from decimal import Decimal
@ -18,6 +19,12 @@ from lbry.wallet.ledger import BaseLedger, MainNetLedger, RegTestLedger
from .common import CLAIM_TYPES, STREAM_TYPES
class SQLiteInterruptedError(sqlite3.OperationalError):
def __init__(self, metrics):
super().__init__('sqlite query interrupted')
self.metrics = metrics
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
@ -59,6 +66,8 @@ class ReaderState:
metrics: Dict
is_tracking_metrics: bool
ledger: Type[BaseLedger]
query_timeout: float
log: logging.Logger
def close(self):
self.db.close()
@ -67,17 +76,31 @@ class ReaderState:
self.stack = []
self.metrics = {}
def set_query_timeout(self):
stop_at = time.perf_counter() + self.query_timeout
def interruptor():
if time.perf_counter() >= stop_at:
self.db.interrupt()
return
self.db.set_progress_handler(interruptor, 100)
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
def initializer(_path, _ledger_name, _measure=False):
def initializer(log, _path, _ledger_name, query_timeout, _measure=False):
db = sqlite3.connect(_path, isolation_level=None, uri=True)
db.row_factory = sqlite3.Row
ctx.set(ReaderState(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger
))
ctx.set(
ReaderState(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger,
query_timeout=query_timeout, log=log
)
)
def cleanup():
@ -139,7 +162,17 @@ def encode_result(result):
@measure
def execute_query(sql, values) -> List:
return ctx.get().db.execute(sql, values).fetchall()
context = ctx.get()
context.set_query_timeout()
try:
return context.db.execute(sql, values).fetchall()
except sqlite3.OperationalError as err:
if str(err) == "interrupted":
query_str = sql
for k in sorted(values.keys(), reverse=True):
query_str = query_str.replace(f":{k}", str(values[k]) if not k.startswith("$") else f"'{values[k]}'")
context.log.warning("interrupted slow sqlite query:\n%s", query_str)
raise SQLiteInterruptedError(context.metrics)
@measure

View file

@ -11,7 +11,7 @@ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite
from aiohttp.http_websocket import WSMsgType, WSCloseCode
from torba.rpc.jsonrpc import RPCError
from torba.rpc.jsonrpc import RPCError, JSONRPC
from torba.server.session import ElectrumX, SessionManager
from torba.server import util
@ -115,6 +115,8 @@ class LBRYSessionManager(SessionManager):
'execution_time': 0,
'query_time': 0,
'query_count': 0,
'interrupted': 0,
'interrupted_query_values': [],
}
return self.command_metrics[command]
@ -145,6 +147,18 @@ class LBRYSessionManager(SessionManager):
else:
reader[key] += func_metrics[key]
def interrupted_command_error(self, command_name, elapsed, metrics, kwargs):
if self.env.track_metrics:
command = self.get_command_tracking_info(command_name)
command['finished'] += 1
command['interrupted'] += 1
command['total_time'] += elapsed
command['execution_time'] += (metrics[command_name]['total'] - metrics['execute_query']['total'])
command['query_time'] += metrics['execute_query']['total']
command['query_count'] += metrics['execute_query']['calls']
if len(command['interrupted_query_values']) < 100:
command['interrupted_query_values'].append(kwargs)
async def process_metrics(self):
while self.running:
commands, self.command_metrics = self.command_metrics, {}
@ -160,7 +174,8 @@ class LBRYSessionManager(SessionManager):
self.running = True
args = dict(
initializer=reader.initializer,
initargs=('claims.db', self.env.coin.NET, self.env.track_metrics)
initargs=(self.logger, 'claims.db', self.env.coin.NET, self.env.database_query_timeout,
self.env.track_metrics)
)
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1, **args)
@ -210,9 +225,16 @@ class LBRYElectrumX(ElectrumX):
if self.env.track_metrics:
self.session_mgr.start_command_tracking(name)
start = time.perf_counter()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
try:
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
except reader.SQLiteInterruptedError as error:
self.session_mgr.interrupted_command_error(
name, int((time.perf_counter() - start) * 1000), error.metrics, kwargs
)
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
if self.env.track_metrics:
elapsed = int((time.perf_counter() - start) * 1000)
(result, metrics) = result
@ -228,7 +250,6 @@ class LBRYElectrumX(ElectrumX):
elif cache_item.result is not None:
self.session_mgr.cache_hit(query_name)
return cache_item.result
async with cache_item.lock:
result = cache_item.result
if result is None:

View file

@ -1,9 +1,12 @@
import uvloop, asyncio, time, sys
import uvloop, asyncio, time, sys, logging
from concurrent.futures import ProcessPoolExecutor
from lbry.wallet.server.db import reader
db_path = '/tmp/wallet-server/claims.db'
default_query_timout = 0.25
log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
async def run_times(executor, iterations, show=True):
@ -40,7 +43,9 @@ async def run_times(executor, iterations, show=True):
async def main():
executor = ProcessPoolExecutor(4, initializer=reader.initializer, initargs=(db_path, 'mainnet', True))
executor = ProcessPoolExecutor(
4, initializer=reader.initializer, initargs=(log, db_path, 'mainnet', default_query_timout, True)
)
await run_times(executor, 4, show=False)
await run_times(executor, 1)
await run_times(executor, 2**3)

View file

@ -1,6 +1,7 @@
import unittest
import ecdsa
import hashlib
import logging
from binascii import hexlify
from torba.client.constants import COIN, NULL_HASH32
@ -36,6 +37,7 @@ class OldWalletServerTransaction:
class TestSQLDB(unittest.TestCase):
query_timeout = 0.25
def setUp(self):
self.first_sync = False
@ -44,7 +46,7 @@ class TestSQLDB(unittest.TestCase):
db_url = 'file:test_sqldb?mode=memory&cache=shared'
self.sql = writer.SQLDB(self, db_url)
self.addCleanup(self.sql.close)
reader.initializer(db_url, 'regtest')
reader.initializer(logging.getLogger(__name__), db_url, 'regtest', self.query_timeout)
self.addCleanup(reader.cleanup)
self.timer = Timer('BlockProcessor')
self.sql.open()

View file

@ -165,6 +165,8 @@ class JSONRPC(object):
METHOD_NOT_FOUND = -32601
INVALID_ARGS = -32602
INTERNAL_ERROR = -32603
QUERY_TIMEOUT = -32000
# Codes specific to this library
ERROR_CODE_UNAVAILABLE = -100

View file

@ -86,6 +86,7 @@ class Env:
self.identities = [identity
for identity in (clearnet_identity, tor_identity)
if identity is not None]
self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 250)) / 1000.0
@classmethod
def default(cls, envvar, default):