Merge pull request #2937 from lbryio/daemon-prometheus

fix database lockup and add prometheus db metrics
This commit is contained in:
Jack Robison 2020-05-04 13:51:12 -04:00 committed by GitHub
commit ef02d776ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 292 additions and 145 deletions

View file

@ -19,7 +19,7 @@ from functools import wraps, partial
import ecdsa
import base58
from aiohttp import web
from prometheus_client import generate_latest as prom_generate_latest
from prometheus_client import generate_latest as prom_generate_latest, Gauge, Histogram, Counter
from google.protobuf.message import DecodeError
from lbry.wallet import (
Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic,
@ -290,6 +290,11 @@ class JSONRPCServerType(type):
return klass
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
class Daemon(metaclass=JSONRPCServerType):
"""
LBRYnet daemon, a jsonrpc interface to lbry functions
@ -297,6 +302,28 @@ class Daemon(metaclass=JSONRPCServerType):
callable_methods: dict
deprecated_methods: dict
pending_requests_metric = Gauge(
"pending_requests", "Number of running api requests", namespace="daemon_api",
labelnames=("method",)
)
requests_count_metric = Counter(
"requests_count", "Number of requests received", namespace="daemon_api",
labelnames=("method",)
)
failed_request_metric = Counter(
"failed_request_count", "Number of failed requests", namespace="daemon_api",
labelnames=("method",)
)
cancelled_request_metric = Counter(
"cancelled_request_count", "Number of cancelled requests", namespace="daemon_api",
labelnames=("method",)
)
response_time_metric = Histogram(
"response_time", "Response times", namespace="daemon_api", buckets=HISTOGRAM_BUCKETS,
labelnames=("method",)
)
def __init__(self, conf: Config, component_manager: typing.Optional[ComponentManager] = None):
self.conf = conf
self.platform_info = system_info.get_platform()
@ -457,7 +484,6 @@ class Daemon(metaclass=JSONRPCServerType):
log.info("Starting LBRYNet Daemon")
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
log.info("Platform: %s", json.dumps(self.platform_info, indent=2))
self.need_connection_status_refresh.set()
self._connection_status_task = self.component_manager.loop.create_task(
self.keep_connection_status_up_to_date()
@ -663,20 +689,27 @@ class Daemon(metaclass=JSONRPCServerType):
JSONRPCError.CODE_INVALID_PARAMS,
params_error_message,
)
self.pending_requests_metric.labels(method=function_name).inc()
self.requests_count_metric.labels(method=function_name).inc()
start = time.perf_counter()
try:
result = method(self, *_args, **_kwargs)
if asyncio.iscoroutine(result):
result = await result
return result
except asyncio.CancelledError:
self.cancelled_request_metric.labels(method=function_name).inc()
log.info("cancelled API call for: %s", function_name)
raise
except Exception as e: # pylint: disable=broad-except
self.failed_request_metric.labels(method=function_name).inc()
log.exception("error handling api request")
return JSONRPCError.create_command_exception(
command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc()
)
finally:
self.pending_requests_metric.labels(method=function_name).dec()
self.response_time_metric.labels(method=function_name).observe(time.perf_counter() - start)
def _verify_method_is_callable(self, function_path):
if function_path not in self.callable_methods:

32
lbry/prometheus.py Normal file
View file

@ -0,0 +1,32 @@
import logging
from aiohttp import web
from prometheus_client import generate_latest as prom_generate_latest
class PrometheusServer:
def __init__(self, logger=None):
self.runner = None
self.logger = logger or logging.getLogger(__name__)
async def start(self, interface: str, port: int):
prom_app = web.Application()
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
self.runner = web.AppRunner(prom_app)
await self.runner.setup()
metrics_site = web.TCPSite(self.runner, interface, port, shutdown_timeout=.5)
await metrics_site.start()
self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2])
async def handle_metrics_get_request(self, request: web.Request):
try:
return web.Response(
text=prom_generate_latest().decode(),
content_type='text/plain; version=0.0.4'
)
except Exception:
self.logger.exception('could not generate prometheus data')
raise
async def stop(self):
await self.runner.cleanup()

View file

@ -3,6 +3,7 @@ import codecs
import datetime
import random
import socket
import time
import string
import sys
import json
@ -282,3 +283,25 @@ async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled
def is_running_from_bundle():
# see https://pyinstaller.readthedocs.io/en/stable/runtime-information.html
return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS')
class LockWithMetrics(asyncio.Lock):
def __init__(self, acquire_metric, held_time_metric, loop=None):
super().__init__(loop=loop)
self._acquire_metric = acquire_metric
self._lock_held_time_metric = held_time_metric
self._lock_acquired_time = None
async def acquire(self):
start = time.perf_counter()
try:
return await super().acquire()
finally:
self._lock_acquired_time = time.perf_counter()
self._acquire_metric.observe(self._lock_acquired_time - start)
def release(self):
try:
return super().release()
finally:
self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time)

View file

@ -10,6 +10,8 @@ from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional
from datetime import date
from prometheus_client import Gauge, Counter, Histogram
from lbry.utils import LockWithMetrics
from .bip32 import PubKey
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
@ -20,6 +22,10 @@ from .util import date_to_julian_day
log = logging.getLogger(__name__)
sqlite3.enable_callback_tracebacks(True)
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
@dataclass
class ReaderProcessState:
@ -64,15 +70,36 @@ else:
class AIOSQLite:
reader_executor: ReaderExecutorClass
waiting_writes_metric = Gauge(
"waiting_writes_count", "Number of waiting db writes", namespace="daemon_database"
)
waiting_reads_metric = Gauge(
"waiting_reads_count", "Number of waiting db writes", namespace="daemon_database"
)
write_count_metric = Counter(
"write_count", "Number of database writes", namespace="daemon_database"
)
read_count_metric = Counter(
"read_count", "Number of database reads", namespace="daemon_database"
)
acquire_write_lock_metric = Histogram(
f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database", buckets=HISTOGRAM_BUCKETS
)
held_write_lock_metric = Histogram(
f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database",
buckets=HISTOGRAM_BUCKETS
)
def __init__(self):
# has to be single threaded as there is no mapping of thread:connection
self.writer_executor = ThreadPoolExecutor(max_workers=1)
self.writer_connection: Optional[sqlite3.Connection] = None
self._closing = False
self.query_count = 0
self.write_lock = asyncio.Lock()
self.write_lock = LockWithMetrics(self.acquire_write_lock_metric, self.held_write_lock_metric)
self.writers = 0
self.read_ready = asyncio.Event()
self.urgent_read_done = asyncio.Event()
@classmethod
async def connect(cls, path: Union[bytes, str], *args, **kwargs):
@ -88,6 +115,7 @@ class AIOSQLite:
)
await asyncio.get_event_loop().run_in_executor(db.writer_executor, _connect_writer)
db.read_ready.set()
db.urgent_read_done.set()
return db
async def close(self):
@ -112,12 +140,28 @@ class AIOSQLite:
read_only=False, fetch_all: bool = False) -> List[dict]:
read_only_fn = run_read_only_fetchall if fetch_all else run_read_only_fetchone
parameters = parameters if parameters is not None else []
still_waiting = False
urgent_read = False
if read_only:
while self.writers:
await self.read_ready.wait()
return await asyncio.get_event_loop().run_in_executor(
self.reader_executor, read_only_fn, sql, parameters
)
self.waiting_reads_metric.inc()
self.read_count_metric.inc()
try:
while self.writers: # more writes can come in while we are waiting for the first
if not urgent_read and still_waiting and self.urgent_read_done.is_set():
# throttle the writes if they pile up
self.urgent_read_done.clear()
urgent_read = True
# wait until the running writes have finished
await self.read_ready.wait()
still_waiting = True
return await asyncio.get_event_loop().run_in_executor(
self.reader_executor, read_only_fn, sql, parameters
)
finally:
if urgent_read:
# unthrottle the writers if they had to be throttled
self.urgent_read_done.set()
self.waiting_reads_metric.dec()
if fetch_all:
return await self.run(lambda conn: conn.execute(sql, parameters).fetchall())
return await self.run(lambda conn: conn.execute(sql, parameters).fetchone())
@ -135,17 +179,32 @@ class AIOSQLite:
return self.run(lambda conn: conn.execute(sql, parameters))
async def run(self, fun, *args, **kwargs):
self.write_count_metric.inc()
self.waiting_writes_metric.inc()
# it's possible many writes are coming in one after the other, these can
# block reader calls for a long time
# if the reader waits for the writers to finish and then has to wait for
# yet more, it will clear the urgent_read_done event to block more writers
# piling on
try:
await self.urgent_read_done.wait()
except Exception as e:
self.waiting_writes_metric.dec()
raise e
self.writers += 1
# block readers
self.read_ready.clear()
async with self.write_lock:
try:
try:
async with self.write_lock:
return await asyncio.get_event_loop().run_in_executor(
self.writer_executor, lambda: self.__run_transaction(fun, *args, **kwargs)
)
finally:
self.writers -= 1
if not self.writers:
self.read_ready.set()
finally:
self.writers -= 1
self.waiting_writes_metric.dec()
if not self.writers:
# unblock the readers once the last enqueued writer finishes
self.read_ready.set()
def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs):
self.writer_connection.execute('begin')
@ -160,10 +219,26 @@ class AIOSQLite:
log.warning("rolled back")
raise
def run_with_foreign_keys_disabled(self, fun, *args, **kwargs) -> Awaitable:
return asyncio.get_event_loop().run_in_executor(
self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
)
async def run_with_foreign_keys_disabled(self, fun, *args, **kwargs):
self.write_count_metric.inc()
self.waiting_writes_metric.inc()
try:
await self.urgent_read_done.wait()
except Exception as e:
self.waiting_writes_metric.dec()
raise e
self.writers += 1
self.read_ready.clear()
try:
async with self.write_lock:
return await asyncio.get_event_loop().run_in_executor(
self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
)
finally:
self.writers -= 1
self.waiting_writes_metric.dec()
if not self.writers:
self.read_ready.set()
def __run_transaction_with_foreign_keys_disabled(self,
fun: Callable[[sqlite3.Connection, Any, Any], Any],
@ -579,7 +654,7 @@ class Database(SQLiteMixin):
return self.db.run(__many)
async def reserve_outputs(self, txos, is_reserved=True):
txoids = ((is_reserved, txo.id) for txo in txos)
txoids = [(is_reserved, txo.id) for txo in txos]
await self.db.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?", txoids)
async def release_outputs(self, txos):

View file

@ -33,13 +33,12 @@ from asyncio import Event, CancelledError
import logging
import time
from contextlib import suppress
from prometheus_client import Counter, Histogram
from lbry.wallet.tasks import TaskGroup
from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Notification
from .jsonrpc import RPCError, ProtocolError
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT, RESET_CONNECTIONS
class Connector:
@ -372,10 +371,26 @@ class BatchRequest:
raise BatchError(self)
NAMESPACE = "wallet_server"
class RPCSession(SessionBase):
"""Base class for protocols where a message can lead to a response,
for example JSON RPC."""
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE,
labelnames=("method", "version"))
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=NAMESPACE, labelnames=("method", "version"))
REQUEST_ERRORS_COUNT = Counter(
"request_error", "Number of requests that returned errors", namespace=NAMESPACE,
labelnames=("method", "version")
)
RESET_CONNECTIONS = Counter(
"reset_clients", "Number of reset connections by client version",
namespace=NAMESPACE, labelnames=("version",)
)
def __init__(self, *, framer=None, loop=None, connection=None):
super().__init__(framer=framer, loop=loop)
self.connection = connection or self.default_connection()
@ -388,7 +403,7 @@ class RPCSession(SessionBase):
except MemoryError:
self.logger.warning('received oversized message from %s:%s, dropping connection',
self._address[0], self._address[1])
RESET_CONNECTIONS.labels(version=self.client_version).inc()
self.RESET_CONNECTIONS.labels(version=self.client_version).inc()
self._close()
return
@ -422,7 +437,7 @@ class RPCSession(SessionBase):
'internal server error')
if isinstance(request, Request):
message = request.send_result(result)
RESPONSE_TIMES.labels(
self.RESPONSE_TIMES.labels(
method=request.method,
version=self.client_version
).observe(time.perf_counter() - start)
@ -430,7 +445,7 @@ class RPCSession(SessionBase):
await self._send_message(message)
if isinstance(result, Exception):
self._bump_errors()
REQUEST_ERRORS_COUNT.labels(
self.REQUEST_ERRORS_COUNT.labels(
method=request.method,
version=self.client_version
).inc()
@ -467,7 +482,7 @@ class RPCSession(SessionBase):
async def send_notification(self, method, args=()):
"""Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args))
NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
await self._send_message(message)
def send_batch(self, raise_errors=False):

View file

@ -3,6 +3,7 @@ import asyncio
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional
from prometheus_client import Gauge, Histogram
import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.db.writer import SQLDB
@ -10,7 +11,6 @@ from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES, REORG_COUNT
class Prefetcher:
@ -129,6 +129,9 @@ class ChainError(Exception):
"""Raised on error processing blocks."""
NAMESPACE = "wallet_server"
class BlockProcessor:
"""Process blocks and update the DB state to match.
@ -136,6 +139,14 @@ class BlockProcessor:
Coordinate backing up in case of chain reorganisations.
"""
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
block_update_time_metric = Histogram("block_time", "Block update times", namespace=NAMESPACE)
reorg_count_metric = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE
)
def __init__(self, env, db, daemon, notifications):
self.env = env
self.db = db
@ -199,8 +210,8 @@ class BlockProcessor:
cache.clear()
await self._maybe_flush()
processed_time = time.perf_counter() - start
BLOCK_COUNT.set(self.height)
BLOCK_UPDATE_TIMES.observe(processed_time)
self.block_count_metric.set(self.height)
self.block_update_time_metric.observe(processed_time)
if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
@ -255,7 +266,7 @@ class BlockProcessor:
last -= len(raw_blocks)
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
REORG_COUNT.inc()
self.reorg_count_metric.inc()
async def reorg_hashes(self, count):
"""Return a pair (start, last, hashes) of blocks to back up during a

View file

@ -6,11 +6,12 @@ from functools import wraps
from pylru import lrucache
import aiohttp
from prometheus_client import Gauge, Histogram
from lbry.wallet.rpc.jsonrpc import RPCError
from lbry.wallet.server.util import hex_to_bytes, class_logger
from lbry.wallet.rpc import JSONRPC
from lbry.wallet.server.prometheus import LBRYCRD_REQUEST_TIMES, LBRYCRD_PENDING_COUNT
class DaemonError(Exception):
"""Raised when the daemon returns an error in its results."""
@ -24,12 +25,23 @@ class WorkQueueFullError(Exception):
"""Internal - when the daemon's work queue is full."""
NAMESPACE = "wallet_server"
class Daemon:
"""Handles connections to a daemon at the given URL."""
WARMING_UP = -28
id_counter = itertools.count()
lbrycrd_request_time_metric = Histogram(
"lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",)
)
lbrycrd_pending_count_metric = Gauge(
"lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE,
labelnames=("method",)
)
def __init__(self, coin, url, max_workqueue=10, init_retry=0.25,
max_retry=4.0):
self.coin = coin
@ -129,7 +141,7 @@ class Daemon:
while True:
try:
for method in methods:
LBRYCRD_PENDING_COUNT.labels(method=method).inc()
self.lbrycrd_pending_count_metric.labels(method=method).inc()
result = await self._send_data(data)
result = processor(result)
if on_good_message:
@ -154,7 +166,7 @@ class Daemon:
on_good_message = 'running normally'
finally:
for method in methods:
LBRYCRD_PENDING_COUNT.labels(method=method).dec()
self.lbrycrd_pending_count_metric.labels(method=method).dec()
await asyncio.sleep(retry)
retry = max(min(self.max_retry, retry * 2), self.init_retry)
@ -175,7 +187,7 @@ class Daemon:
if params:
payload['params'] = params
result = await self._send(payload, processor)
LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start)
self.lbrycrd_request_time_metric.labels(method=method).observe(time.perf_counter() - start)
return result
async def _send_vector(self, method, params_iterable, replace_errs=False):
@ -200,7 +212,7 @@ class Daemon:
result = []
if payload:
result = await self._send(payload, processor)
LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start)
self.lbrycrd_request_time_metric.labels(method=method).observe(time.perf_counter() - start)
return result
async def _is_rpc_available(self, method):

View file

@ -1,89 +0,0 @@
import os
from aiohttp import web
from prometheus_client import Counter, Info, generate_latest as prom_generate_latest, Histogram, Gauge
from lbry import __version__ as version
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server import util
import lbry.wallet.server.version as wallet_server_version
NAMESPACE = "wallet_server"
CPU_COUNT = f"{os.cpu_count()}"
VERSION_INFO = Info('build', 'Wallet server build info (e.g. version, commit hash)', namespace=NAMESPACE)
VERSION_INFO.info({
'build': BUILD,
"commit": COMMIT_HASH,
"docker_tag": DOCKER_TAG,
'version': version,
"min_version": util.version_string(wallet_server_version.PROTOCOL_MIN),
"cpu_count": CPU_COUNT
})
SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version", ))
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method", "version"))
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method", "version"))
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=NAMESPACE, labelnames=("method", "version"))
REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE,
labelnames=("method", "version"))
SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
SQLITE_OPERATIONAL_ERROR_COUNT = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
)
SQLITE_INTERNAL_ERROR_COUNT = Counter(
"internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE
)
SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE)
SQLITE_PENDING_COUNT = Gauge(
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
)
LBRYCRD_REQUEST_TIMES = Histogram(
"lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",)
)
LBRYCRD_PENDING_COUNT = Gauge(
"lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE, labelnames=("method",)
)
CLIENT_VERSIONS = Counter(
"clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",)
)
BLOCK_COUNT = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=NAMESPACE)
REORG_COUNT = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE
)
RESET_CONNECTIONS = Counter(
"reset_clients", "Number of reset connections by client version",
namespace=NAMESPACE, labelnames=("version",)
)
class PrometheusServer:
def __init__(self):
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.runner = None
async def start(self, port: int):
prom_app = web.Application()
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
self.runner = web.AppRunner(prom_app)
await self.runner.setup()
metrics_site = web.TCPSite(self.runner, "0.0.0.0", port, shutdown_timeout=.5)
await metrics_site.start()
self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2])
async def handle_metrics_get_request(self, request: web.Request):
try:
return web.Response(
text=prom_generate_latest().decode(),
content_type='text/plain; version=0.0.4'
)
except Exception:
self.logger.exception('could not generate prometheus data')
raise
async def stop(self):
await self.runner.cleanup()

View file

@ -6,7 +6,7 @@ import typing
import lbry
from lbry.wallet.server.mempool import MemPool, MemPoolAPI
from lbry.wallet.server.prometheus import PrometheusServer
from lbry.prometheus import PrometheusServer
class Notifications:
@ -143,4 +143,4 @@ class Server:
async def start_prometheus(self):
if not self.prometheus_server and self.env.prometheus_port:
self.prometheus_server = PrometheusServer()
await self.prometheus_server.start(self.env.prometheus_port)
await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)

View file

@ -20,16 +20,15 @@ from functools import partial
from binascii import hexlify
from pylru import lrucache
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from prometheus_client import Counter, Info, Histogram, Gauge
import lbry
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.server.prometheus import REQUESTS_COUNT, SQLITE_INTERRUPT_COUNT, SQLITE_INTERNAL_ERROR_COUNT
from lbry.wallet.server.prometheus import SQLITE_OPERATIONAL_ERROR_COUNT, SQLITE_EXECUTOR_TIMES, SESSIONS_COUNT
from lbry.wallet.server.prometheus import SQLITE_PENDING_COUNT, CLIENT_VERSIONS
from lbry.wallet.rpc.framing import NewlineFramer
import lbry.wallet.server.version as VERSION
@ -119,9 +118,45 @@ class SessionGroup:
self.semaphore = asyncio.Semaphore(20)
NAMESPACE = "wallet_server"
class SessionManager:
"""Holds global state about all sessions."""
version_info_metric = Info(
'build', 'Wallet server build info (e.g. version, commit hash)', namespace=NAMESPACE
)
version_info_metric.info({
'build': BUILD,
"commit": COMMIT_HASH,
"docker_tag": DOCKER_TAG,
'version': lbry.__version__,
"min_version": util.version_string(VERSION.PROTOCOL_MIN),
"cpu_count": os.cpu_count()
})
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",))
request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method", "version"))
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
)
db_error_metric = Counter(
"internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE
)
executor_time_metric = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE)
pending_query_metric = Gauge(
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
)
client_version_metric = Counter(
"clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",)
)
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send)
@ -677,7 +712,7 @@ class SessionBase(RPCSession):
context = {'conn_id': f'{self.session_id}'}
self.logger = util.ConnectionLogger(self.logger, context)
self.group = self.session_mgr.add_session(self)
SESSIONS_COUNT.labels(version=self.client_version).inc()
self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
peer_addr_str = self.peer_address_str()
self.logger.info(f'{self.kind} {peer_addr_str}, '
f'{self.session_mgr.session_count():,d} total')
@ -686,7 +721,7 @@ class SessionBase(RPCSession):
"""Handle client disconnection."""
super().connection_lost(exc)
self.session_mgr.remove_session(self)
SESSIONS_COUNT.labels(version=self.client_version).dec()
self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
msg = ''
if not self._can_send.is_set():
msg += ' whilst paused'
@ -710,7 +745,7 @@ class SessionBase(RPCSession):
"""Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions.
"""
REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc()
self.session_mgr.request_count_metric.labels(method=request.method, version=self.client_version).inc()
if isinstance(request, Request):
handler = self.request_handlers.get(request.method)
handler = partial(handler, self)
@ -946,7 +981,7 @@ class LBRYElectrumX(SessionBase):
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
SQLITE_PENDING_COUNT.inc()
self.session_mgr.pending_query_metric.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
@ -955,18 +990,18 @@ class LBRYElectrumX(SessionBase):
except reader.SQLiteInterruptedError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_interrupt(start, error.metrics)
SQLITE_INTERRUPT_COUNT.inc()
self.session_mgr.self.session_mgr.SQLITE_INTERRUPT_COUNT.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
except reader.SQLiteOperationalError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, error.metrics)
SQLITE_OPERATIONAL_ERROR_COUNT.inc()
self.session_mgr.db_operational_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
except Exception:
log.exception("dear devs, please handle this exception better")
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, {})
SQLITE_INTERNAL_ERROR_COUNT.inc()
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
if self.env.track_metrics:
@ -975,8 +1010,8 @@ class LBRYElectrumX(SessionBase):
metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode()
finally:
SQLITE_PENDING_COUNT.dec()
SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start)
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def run_and_cache_query(self, query_name, function, kwargs):
metrics = self.get_metrics_or_placeholder_for_api(query_name)
@ -1443,10 +1478,10 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST,
f'unsupported client: {client_name}')
if self.client_version != client_name[:17]:
SESSIONS_COUNT.labels(version=self.client_version).dec()
self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
self.client_version = client_name[:17]
SESSIONS_COUNT.labels(version=self.client_version).inc()
CLIENT_VERSIONS.labels(version=self.client_version).inc()
self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
self.session_mgr.client_version_metric.labels(version=self.client_version).inc()
# Find the highest common protocol version. Disconnect if
# that protocol version in unsupported.

View file

@ -2,7 +2,6 @@ import logging
import asyncio
from binascii import hexlify
from lbry.testcase import CommandTestCase
from lbry.wallet.server.prometheus import REORG_COUNT
class BlockchainReorganizationTests(CommandTestCase):
@ -16,7 +15,8 @@ class BlockchainReorganizationTests(CommandTestCase):
)
async def test_reorg(self):
REORG_COUNT.set(0)
bp = self.conductor.spv_node.server.bp
bp.reorg_count_metric.set(0)
# invalidate current block, move forward 2
self.assertEqual(self.ledger.headers.height, 206)
await self.assertBlockHash(206)
@ -26,7 +26,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(self.ledger.headers.height, 207)
await self.assertBlockHash(206)
await self.assertBlockHash(207)
self.assertEqual(1, REORG_COUNT._samples()[0][2])
self.assertEqual(1, bp.reorg_count_metric._samples()[0][2])
# invalidate current block, move forward 3
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
@ -36,7 +36,7 @@ class BlockchainReorganizationTests(CommandTestCase):
await self.assertBlockHash(206)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
self.assertEqual(2, REORG_COUNT._samples()[0][2])
self.assertEqual(2, bp.reorg_count_metric._samples()[0][2])
async def test_reorg_change_claim_height(self):
# sanity check