From 0a9d4de1265df4f96a9c6a58c2881891ee8d6209 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 May 2020 11:40:52 -0400 Subject: [PATCH 1/9] include write lock in try/finally --- lbry/wallet/database.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 5aec29649..ec92d6522 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -137,15 +137,15 @@ class AIOSQLite: async def run(self, fun, *args, **kwargs): self.writers += 1 self.read_ready.clear() - async with self.write_lock: - try: + try: + async with self.write_lock: return await asyncio.get_event_loop().run_in_executor( self.writer_executor, lambda: self.__run_transaction(fun, *args, **kwargs) ) - finally: - self.writers -= 1 - if not self.writers: - self.read_ready.set() + finally: + self.writers -= 1 + if not self.writers: + self.read_ready.set() def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs): self.writer_connection.execute('begin') From 79624febc09bd2dcd93453a83b74f98458b9889c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 May 2020 12:48:41 -0400 Subject: [PATCH 2/9] prevent pileup of writes blocking reads --- lbry/wallet/database.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index ec92d6522..446f53ee5 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -73,6 +73,7 @@ class AIOSQLite: self.write_lock = asyncio.Lock() self.writers = 0 self.read_ready = asyncio.Event() + self.urgent_read_done = asyncio.Event() @classmethod async def connect(cls, path: Union[bytes, str], *args, **kwargs): @@ -88,6 +89,7 @@ class AIOSQLite: ) await asyncio.get_event_loop().run_in_executor(db.writer_executor, _connect_writer) db.read_ready.set() + db.urgent_read_done.set() return db async def close(self): @@ -112,12 +114,25 @@ class AIOSQLite: read_only=False, fetch_all: bool = False) -> List[dict]: read_only_fn = run_read_only_fetchall if fetch_all else run_read_only_fetchone parameters = parameters if parameters is not None else [] + still_waiting = False + urgent_read = False if read_only: - while self.writers: - await self.read_ready.wait() - return await asyncio.get_event_loop().run_in_executor( - self.reader_executor, read_only_fn, sql, parameters - ) + try: + while self.writers: # more writes can come in while we are waiting for the first + if not urgent_read and still_waiting and self.urgent_read_done.is_set(): + # throttle the writes if they pile up + self.urgent_read_done.clear() + urgent_read = True + # wait until the running writes have finished + await self.read_ready.wait() + still_waiting = True + return await asyncio.get_event_loop().run_in_executor( + self.reader_executor, read_only_fn, sql, parameters + ) + finally: + if urgent_read: + # unthrottle the writers if they had to be throttled + self.urgent_read_done.set() if fetch_all: return await self.run(lambda conn: conn.execute(sql, parameters).fetchall()) return await self.run(lambda conn: conn.execute(sql, parameters).fetchone()) @@ -135,6 +150,7 @@ class AIOSQLite: return self.run(lambda conn: conn.execute(sql, parameters)) async def run(self, fun, *args, **kwargs): + await self.urgent_read_done.wait() self.writers += 1 self.read_ready.clear() try: From 36c05fc4b97e7092b185fd48c56334f20b12cc32 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 23 Apr 2020 20:21:27 -0400 Subject: [PATCH 3/9] move wallet server prometheus -only run wallet server metrics for the wallet server --- lbry/prometheus.py | 32 +++ lbry/wallet/rpc/session.py | 10 +- lbry/wallet/server/block_processor.py | 8 +- lbry/wallet/server/daemon.py | 11 +- lbry/wallet/server/prometheus.py | 188 +++++++++++------- lbry/wallet/server/server.py | 7 +- lbry/wallet/server/session.py | 28 ++- .../test_blockchain_reorganization.py | 8 +- 8 files changed, 181 insertions(+), 111 deletions(-) create mode 100644 lbry/prometheus.py diff --git a/lbry/prometheus.py b/lbry/prometheus.py new file mode 100644 index 000000000..220ee97bd --- /dev/null +++ b/lbry/prometheus.py @@ -0,0 +1,32 @@ +import logging +from aiohttp import web +from prometheus_client import generate_latest as prom_generate_latest + + +class PrometheusServer: + def __init__(self, logger=None): + self.runner = None + self.logger = logger or logging.getLogger(__name__) + + async def start(self, interface: str, port: int): + prom_app = web.Application() + prom_app.router.add_get('/metrics', self.handle_metrics_get_request) + self.runner = web.AppRunner(prom_app) + await self.runner.setup() + + metrics_site = web.TCPSite(self.runner, interface, port, shutdown_timeout=.5) + await metrics_site.start() + self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2]) + + async def handle_metrics_get_request(self, request: web.Request): + try: + return web.Response( + text=prom_generate_latest().decode(), + content_type='text/plain; version=0.0.4' + ) + except Exception: + self.logger.exception('could not generate prometheus data') + raise + + async def stop(self): + await self.runner.cleanup() diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index 53c164f4f..a454e3a30 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -39,7 +39,7 @@ from lbry.wallet.tasks import TaskGroup from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Notification from .jsonrpc import RPCError, ProtocolError from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer -from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT, RESET_CONNECTIONS +from lbry.wallet.server import prometheus class Connector: @@ -388,7 +388,7 @@ class RPCSession(SessionBase): except MemoryError: self.logger.warning('received oversized message from %s:%s, dropping connection', self._address[0], self._address[1]) - RESET_CONNECTIONS.labels(version=self.client_version).inc() + prometheus.METRICS.RESET_CONNECTIONS.labels(version=self.client_version).inc() self._close() return @@ -422,7 +422,7 @@ class RPCSession(SessionBase): 'internal server error') if isinstance(request, Request): message = request.send_result(result) - RESPONSE_TIMES.labels( + prometheus.METRICS.RESPONSE_TIMES.labels( method=request.method, version=self.client_version ).observe(time.perf_counter() - start) @@ -430,7 +430,7 @@ class RPCSession(SessionBase): await self._send_message(message) if isinstance(result, Exception): self._bump_errors() - REQUEST_ERRORS_COUNT.labels( + prometheus.METRICS.REQUEST_ERRORS_COUNT.labels( method=request.method, version=self.client_version ).inc() @@ -467,7 +467,7 @@ class RPCSession(SessionBase): async def send_notification(self, method, args=()): """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) - NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() + prometheus.METRICS.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() await self._send_message(message) def send_batch(self, raise_errors=False): diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 44eba7d1a..48a4519d0 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -10,7 +10,7 @@ from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.leveldb import FlushData -from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES, REORG_COUNT +from lbry.wallet.server import prometheus class Prefetcher: @@ -199,8 +199,8 @@ class BlockProcessor: cache.clear() await self._maybe_flush() processed_time = time.perf_counter() - start - BLOCK_COUNT.set(self.height) - BLOCK_UPDATE_TIMES.observe(processed_time) + prometheus.METRICS.BLOCK_COUNT.set(self.height) + prometheus.METRICS.BLOCK_UPDATE_TIMES.observe(processed_time) if not self.db.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) @@ -255,7 +255,7 @@ class BlockProcessor: last -= len(raw_blocks) await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) await self.prefetcher.reset_height(self.height) - REORG_COUNT.inc() + prometheus.METRICS.REORG_COUNT.inc() async def reorg_hashes(self, count): """Return a pair (start, last, hashes) of blocks to back up during a diff --git a/lbry/wallet/server/daemon.py b/lbry/wallet/server/daemon.py index 960c47024..39487f367 100644 --- a/lbry/wallet/server/daemon.py +++ b/lbry/wallet/server/daemon.py @@ -10,7 +10,8 @@ import aiohttp from lbry.wallet.rpc.jsonrpc import RPCError from lbry.wallet.server.util import hex_to_bytes, class_logger from lbry.wallet.rpc import JSONRPC -from lbry.wallet.server.prometheus import LBRYCRD_REQUEST_TIMES, LBRYCRD_PENDING_COUNT +from lbry.wallet.server import prometheus + class DaemonError(Exception): """Raised when the daemon returns an error in its results.""" @@ -129,7 +130,7 @@ class Daemon: while True: try: for method in methods: - LBRYCRD_PENDING_COUNT.labels(method=method).inc() + prometheus.METRICS.LBRYCRD_PENDING_COUNT.labels(method=method).inc() result = await self._send_data(data) result = processor(result) if on_good_message: @@ -154,7 +155,7 @@ class Daemon: on_good_message = 'running normally' finally: for method in methods: - LBRYCRD_PENDING_COUNT.labels(method=method).dec() + prometheus.METRICS.LBRYCRD_PENDING_COUNT.labels(method=method).dec() await asyncio.sleep(retry) retry = max(min(self.max_retry, retry * 2), self.init_retry) @@ -175,7 +176,7 @@ class Daemon: if params: payload['params'] = params result = await self._send(payload, processor) - LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start) + prometheus.METRICS.LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start) return result async def _send_vector(self, method, params_iterable, replace_errs=False): @@ -200,7 +201,7 @@ class Daemon: result = [] if payload: result = await self._send(payload, processor) - LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start) + prometheus.METRICS.LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start) return result async def _is_rpc_available(self, method): diff --git a/lbry/wallet/server/prometheus.py b/lbry/wallet/server/prometheus.py index e28976bf9..ee3e268cf 100644 --- a/lbry/wallet/server/prometheus.py +++ b/lbry/wallet/server/prometheus.py @@ -1,89 +1,125 @@ import os -from aiohttp import web -from prometheus_client import Counter, Info, generate_latest as prom_generate_latest, Histogram, Gauge +from prometheus_client import Counter, Info, Histogram, Gauge from lbry import __version__ as version from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.wallet.server import util import lbry.wallet.server.version as wallet_server_version -NAMESPACE = "wallet_server" -CPU_COUNT = f"{os.cpu_count()}" -VERSION_INFO = Info('build', 'Wallet server build info (e.g. version, commit hash)', namespace=NAMESPACE) -VERSION_INFO.info({ - 'build': BUILD, - "commit": COMMIT_HASH, - "docker_tag": DOCKER_TAG, - 'version': version, - "min_version": util.version_string(wallet_server_version.PROTOCOL_MIN), - "cpu_count": CPU_COUNT -}) -SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, - labelnames=("version", )) -REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, - labelnames=("method", "version")) -RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method", "version")) -NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", - namespace=NAMESPACE, labelnames=("method", "version")) -REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE, - labelnames=("method", "version")) -SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) -SQLITE_OPERATIONAL_ERROR_COUNT = Counter( - "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE -) -SQLITE_INTERNAL_ERROR_COUNT = Counter( - "internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE -) -SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE) -SQLITE_PENDING_COUNT = Gauge( - "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE -) -LBRYCRD_REQUEST_TIMES = Histogram( - "lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",) -) -LBRYCRD_PENDING_COUNT = Gauge( - "lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE, labelnames=("method",) -) -CLIENT_VERSIONS = Counter( - "clients", "Number of connections received per client version", - namespace=NAMESPACE, labelnames=("version",) -) -BLOCK_COUNT = Gauge( - "block_count", "Number of processed blocks", namespace=NAMESPACE -) -BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=NAMESPACE) -REORG_COUNT = Gauge( - "reorg_count", "Number of reorgs", namespace=NAMESPACE -) -RESET_CONNECTIONS = Counter( - "reset_clients", "Number of reset connections by client version", - namespace=NAMESPACE, labelnames=("version",) -) +class PrometheusMetrics: + VERSION_INFO: Info + SESSIONS_COUNT: Gauge + REQUESTS_COUNT: Counter + RESPONSE_TIMES: Histogram + NOTIFICATION_COUNT: Counter + REQUEST_ERRORS_COUNT: Counter + SQLITE_INTERRUPT_COUNT: Counter + SQLITE_OPERATIONAL_ERROR_COUNT: Counter + SQLITE_INTERNAL_ERROR_COUNT: Counter + SQLITE_EXECUTOR_TIMES: Histogram + SQLITE_PENDING_COUNT: Gauge + LBRYCRD_REQUEST_TIMES: Histogram + LBRYCRD_PENDING_COUNT: Gauge + CLIENT_VERSIONS: Counter + BLOCK_COUNT: Gauge + BLOCK_UPDATE_TIMES: Histogram + REORG_COUNT: Gauge + RESET_CONNECTIONS: Counter + + __slots__ = [ + 'VERSION_INFO', + 'SESSIONS_COUNT', + 'REQUESTS_COUNT', + 'RESPONSE_TIMES', + 'NOTIFICATION_COUNT', + 'REQUEST_ERRORS_COUNT', + 'SQLITE_INTERRUPT_COUNT', + 'SQLITE_OPERATIONAL_ERROR_COUNT', + 'SQLITE_INTERNAL_ERROR_COUNT', + 'SQLITE_EXECUTOR_TIMES', + 'SQLITE_PENDING_COUNT', + 'LBRYCRD_REQUEST_TIMES', + 'LBRYCRD_PENDING_COUNT', + 'CLIENT_VERSIONS', + 'BLOCK_COUNT', + 'BLOCK_UPDATE_TIMES', + 'REORG_COUNT', + 'RESET_CONNECTIONS', + '_installed', + 'namespace', + 'cpu_count' + ] -class PrometheusServer: def __init__(self): - self.logger = util.class_logger(__name__, self.__class__.__name__) - self.runner = None + self._installed = False + self.namespace = "wallet_server" + self.cpu_count = f"{os.cpu_count()}" - async def start(self, port: int): - prom_app = web.Application() - prom_app.router.add_get('/metrics', self.handle_metrics_get_request) - self.runner = web.AppRunner(prom_app) - await self.runner.setup() + def uninstall(self): + self._installed = False + for item in self.__slots__: + if not item.startswith('_') and item not in ('namespace', 'cpu_count'): + current = getattr(self, item, None) + if current: + setattr(self, item, None) + del current - metrics_site = web.TCPSite(self.runner, "0.0.0.0", port, shutdown_timeout=.5) - await metrics_site.start() - self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2]) + def install(self): + if self._installed: + return + self._installed = True + self.VERSION_INFO = Info('build', 'Wallet server build info (e.g. version, commit hash)', namespace=self.namespace) + self.VERSION_INFO.info({ + 'build': BUILD, + "commit": COMMIT_HASH, + "docker_tag": DOCKER_TAG, + 'version': version, + "min_version": util.version_string(wallet_server_version.PROTOCOL_MIN), + "cpu_count": self.cpu_count + }) + self.SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=self.namespace, + labelnames=("version",)) + self.REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=self.namespace, + labelnames=("method", "version")) + self.RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=self.namespace, + labelnames=("method", "version")) + self.NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", + namespace=self.namespace, labelnames=("method", "version")) + self.REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=self.namespace, + labelnames=("method", "version")) + self.SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=self.namespace) + self.SQLITE_OPERATIONAL_ERROR_COUNT = Counter( + "operational_error", "Number of queries that raised operational errors", namespace=self.namespace + ) + self.SQLITE_INTERNAL_ERROR_COUNT = Counter( + "internal_error", "Number of queries raising unexpected errors", namespace=self.namespace + ) + self.SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=self.namespace) + self.SQLITE_PENDING_COUNT = Gauge( + "pending_queries_count", "Number of pending and running sqlite queries", namespace=self.namespace + ) + self.LBRYCRD_REQUEST_TIMES = Histogram( + "lbrycrd_request", "lbrycrd requests count", namespace=self.namespace, labelnames=("method",) + ) + self.LBRYCRD_PENDING_COUNT = Gauge( + "lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=self.namespace, + labelnames=("method",) + ) + self.CLIENT_VERSIONS = Counter( + "clients", "Number of connections received per client version", + namespace=self.namespace, labelnames=("version",) + ) + self.BLOCK_COUNT = Gauge( + "block_count", "Number of processed blocks", namespace=self.namespace + ) + self.BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=self.namespace) + self.REORG_COUNT = Gauge( + "reorg_count", "Number of reorgs", namespace=self.namespace + ) + self.RESET_CONNECTIONS = Counter( + "reset_clients", "Number of reset connections by client version", + namespace=self.namespace, labelnames=("version",) + ) - async def handle_metrics_get_request(self, request: web.Request): - try: - return web.Response( - text=prom_generate_latest().decode(), - content_type='text/plain; version=0.0.4' - ) - except Exception: - self.logger.exception('could not generate prometheus data') - raise - async def stop(self): - await self.runner.cleanup() +METRICS = PrometheusMetrics() diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index 4d0374ba4..98000ace9 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -6,7 +6,8 @@ import typing import lbry from lbry.wallet.server.mempool import MemPool, MemPoolAPI -from lbry.wallet.server.prometheus import PrometheusServer +from lbry.prometheus import PrometheusServer +from lbry.wallet.server.prometheus import METRICS class Notifications: @@ -92,6 +93,7 @@ class Server: ) async def start(self): + METRICS.install() env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() self.log.info(f'software version: {lbry.__version__}') @@ -121,6 +123,7 @@ class Server: self.prometheus_server = None self.shutdown_event.set() await self.daemon.close() + METRICS.uninstall() def run(self): loop = asyncio.get_event_loop() @@ -143,4 +146,4 @@ class Server: async def start_prometheus(self): if not self.prometheus_server and self.env.prometheus_port: self.prometheus_server = PrometheusServer() - await self.prometheus_server.start(self.env.prometheus_port) + await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9a9e23558..0dccce853 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -27,9 +27,7 @@ from lbry.wallet.server.db.writer import LBRYLevelDB from lbry.wallet.server.db import reader from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics -from lbry.wallet.server.prometheus import REQUESTS_COUNT, SQLITE_INTERRUPT_COUNT, SQLITE_INTERNAL_ERROR_COUNT -from lbry.wallet.server.prometheus import SQLITE_OPERATIONAL_ERROR_COUNT, SQLITE_EXECUTOR_TIMES, SESSIONS_COUNT -from lbry.wallet.server.prometheus import SQLITE_PENDING_COUNT, CLIENT_VERSIONS +from lbry.wallet.server import prometheus from lbry.wallet.rpc.framing import NewlineFramer import lbry.wallet.server.version as VERSION @@ -677,7 +675,7 @@ class SessionBase(RPCSession): context = {'conn_id': f'{self.session_id}'} self.logger = util.ConnectionLogger(self.logger, context) self.group = self.session_mgr.add_session(self) - SESSIONS_COUNT.labels(version=self.client_version).inc() + prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).inc() peer_addr_str = self.peer_address_str() self.logger.info(f'{self.kind} {peer_addr_str}, ' f'{self.session_mgr.session_count():,d} total') @@ -686,7 +684,7 @@ class SessionBase(RPCSession): """Handle client disconnection.""" super().connection_lost(exc) self.session_mgr.remove_session(self) - SESSIONS_COUNT.labels(version=self.client_version).dec() + prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).dec() msg = '' if not self._can_send.is_set(): msg += ' whilst paused' @@ -710,7 +708,7 @@ class SessionBase(RPCSession): """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. """ - REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc() + prometheus.METRICS.REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc() if isinstance(request, Request): handler = self.request_handlers.get(request.method) handler = partial(handler, self) @@ -946,7 +944,7 @@ class LBRYElectrumX(SessionBase): async def run_in_executor(self, query_name, func, kwargs): start = time.perf_counter() try: - SQLITE_PENDING_COUNT.inc() + prometheus.METRICS.SQLITE_PENDING_COUNT.inc() result = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, func, kwargs ) @@ -955,18 +953,18 @@ class LBRYElectrumX(SessionBase): except reader.SQLiteInterruptedError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_interrupt(start, error.metrics) - SQLITE_INTERRUPT_COUNT.inc() + prometheus.METRICS.prometheus.METRICS.SQLITE_INTERRUPT_COUNT.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') except reader.SQLiteOperationalError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, error.metrics) - SQLITE_OPERATIONAL_ERROR_COUNT.inc() + prometheus.METRICS.SQLITE_OPERATIONAL_ERROR_COUNT.inc() raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') except Exception: log.exception("dear devs, please handle this exception better") metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, {}) - SQLITE_INTERNAL_ERROR_COUNT.inc() + prometheus.METRICS.SQLITE_INTERNAL_ERROR_COUNT.inc() raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') else: if self.env.track_metrics: @@ -975,8 +973,8 @@ class LBRYElectrumX(SessionBase): metrics.query_response(start, metrics_data) return base64.b64encode(result).decode() finally: - SQLITE_PENDING_COUNT.dec() - SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start) + prometheus.METRICS.SQLITE_PENDING_COUNT.dec() + prometheus.METRICS.SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start) async def run_and_cache_query(self, query_name, function, kwargs): metrics = self.get_metrics_or_placeholder_for_api(query_name) @@ -1443,10 +1441,10 @@ class LBRYElectrumX(SessionBase): raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}') if self.client_version != client_name[:17]: - SESSIONS_COUNT.labels(version=self.client_version).dec() + prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).dec() self.client_version = client_name[:17] - SESSIONS_COUNT.labels(version=self.client_version).inc() - CLIENT_VERSIONS.labels(version=self.client_version).inc() + prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).inc() + prometheus.METRICS.CLIENT_VERSIONS.labels(version=self.client_version).inc() # Find the highest common protocol version. Disconnect if # that protocol version in unsupported. diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 216030839..5764c49c0 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -2,7 +2,7 @@ import logging import asyncio from binascii import hexlify from lbry.testcase import CommandTestCase -from lbry.wallet.server.prometheus import REORG_COUNT +from lbry.wallet.server import prometheus class BlockchainReorganizationTests(CommandTestCase): @@ -16,7 +16,7 @@ class BlockchainReorganizationTests(CommandTestCase): ) async def test_reorg(self): - REORG_COUNT.set(0) + prometheus.METRICS.REORG_COUNT.set(0) # invalidate current block, move forward 2 self.assertEqual(self.ledger.headers.height, 206) await self.assertBlockHash(206) @@ -26,7 +26,7 @@ class BlockchainReorganizationTests(CommandTestCase): self.assertEqual(self.ledger.headers.height, 207) await self.assertBlockHash(206) await self.assertBlockHash(207) - self.assertEqual(1, REORG_COUNT._samples()[0][2]) + self.assertEqual(1, prometheus.METRICS.REORG_COUNT._samples()[0][2]) # invalidate current block, move forward 3 await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) @@ -36,7 +36,7 @@ class BlockchainReorganizationTests(CommandTestCase): await self.assertBlockHash(206) await self.assertBlockHash(207) await self.assertBlockHash(208) - self.assertEqual(2, REORG_COUNT._samples()[0][2]) + self.assertEqual(2, prometheus.METRICS.REORG_COUNT._samples()[0][2]) async def test_reorg_change_claim_height(self): # sanity check From 797364ee5cebd1a8e1ef54a178bcc7c6837c4726 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 23 Apr 2020 21:17:44 -0400 Subject: [PATCH 4/9] refactor prometheus metrics --- lbry/extras/daemon/daemon.py | 24 +++- lbry/wallet/database.py | 19 ++- lbry/wallet/rpc/session.py | 27 +++- lbry/wallet/server/block_processor.py | 19 ++- lbry/wallet/server/daemon.py | 21 ++- lbry/wallet/server/prometheus.py | 125 ------------------ lbry/wallet/server/server.py | 3 - lbry/wallet/server/session.py | 63 +++++++-- .../test_blockchain_reorganization.py | 8 +- 9 files changed, 145 insertions(+), 164 deletions(-) delete mode 100644 lbry/wallet/server/prometheus.py diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 6e2ef7aa9..94186abc0 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -19,7 +19,7 @@ from functools import wraps, partial import ecdsa import base58 from aiohttp import web -from prometheus_client import generate_latest as prom_generate_latest +from prometheus_client import generate_latest as prom_generate_latest, Gauge, Histogram, Counter from google.protobuf.message import DecodeError from lbry.wallet import ( Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic, @@ -297,6 +297,20 @@ class Daemon(metaclass=JSONRPCServerType): callable_methods: dict deprecated_methods: dict + pending_requests_metric = Gauge( + "pending_requests", "Number of running api requests", namespace="daemon_api", + labelnames=("method",) + ) + + requests_count_metric = Counter( + "requests_count", "Number of requests received", namespace="daemon_api", + labelnames=("method",) + ) + response_time_metric = Histogram( + "response_time", "Response times", namespace="daemon_api", + labelnames=("method",) + ) + def __init__(self, conf: Config, component_manager: typing.Optional[ComponentManager] = None): self.conf = conf self.platform_info = system_info.get_platform() @@ -457,7 +471,6 @@ class Daemon(metaclass=JSONRPCServerType): log.info("Starting LBRYNet Daemon") log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2)) log.info("Platform: %s", json.dumps(self.platform_info, indent=2)) - self.need_connection_status_refresh.set() self._connection_status_task = self.component_manager.loop.create_task( self.keep_connection_status_up_to_date() @@ -663,7 +676,9 @@ class Daemon(metaclass=JSONRPCServerType): JSONRPCError.CODE_INVALID_PARAMS, params_error_message, ) - + self.pending_requests_metric.labels(method=function_name).inc() + self.requests_count_metric.labels(method=function_name).inc() + start = time.perf_counter() try: result = method(self, *_args, **_kwargs) if asyncio.iscoroutine(result): @@ -677,6 +692,9 @@ class Daemon(metaclass=JSONRPCServerType): return JSONRPCError.create_command_exception( command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc() ) + finally: + self.pending_requests_metric.labels(method=function_name).dec() + self.response_time_metric.labels(method=function_name).observe(time.perf_counter() - start) def _verify_method_is_callable(self, function_path): if function_path not in self.callable_methods: diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 446f53ee5..55f45ff63 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -3,6 +3,7 @@ import logging import asyncio import sqlite3 import platform +import time from binascii import hexlify from dataclasses import dataclass from contextvars import ContextVar @@ -10,6 +11,7 @@ from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.process import ProcessPoolExecutor from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional from datetime import date +from prometheus_client import Gauge from .bip32 import PubKey from .transaction import Transaction, Output, OutputScript, TXRefImmutable @@ -64,6 +66,13 @@ else: class AIOSQLite: reader_executor: ReaderExecutorClass + waiting_writes_metric = Gauge( + "waiting_writes_count", "Number of waiting db writes", namespace="daemon_database" + ) + waiting_reads_metric = Gauge( + "waiting_reads_count", "Number of waiting db writes", namespace="daemon_database" + ) + def __init__(self): # has to be single threaded as there is no mapping of thread:connection self.writer_executor = ThreadPoolExecutor(max_workers=1) @@ -117,6 +126,7 @@ class AIOSQLite: still_waiting = False urgent_read = False if read_only: + self.waiting_reads_metric.inc() try: while self.writers: # more writes can come in while we are waiting for the first if not urgent_read and still_waiting and self.urgent_read_done.is_set(): @@ -133,6 +143,7 @@ class AIOSQLite: if urgent_read: # unthrottle the writers if they had to be throttled self.urgent_read_done.set() + self.waiting_reads_metric.dec() if fetch_all: return await self.run(lambda conn: conn.execute(sql, parameters).fetchall()) return await self.run(lambda conn: conn.execute(sql, parameters).fetchone()) @@ -150,7 +161,12 @@ class AIOSQLite: return self.run(lambda conn: conn.execute(sql, parameters)) async def run(self, fun, *args, **kwargs): - await self.urgent_read_done.wait() + self.waiting_writes_metric.inc() + try: + await self.urgent_read_done.wait() + except Exception as e: + self.waiting_writes_metric.dec() + raise e self.writers += 1 self.read_ready.clear() try: @@ -160,6 +176,7 @@ class AIOSQLite: ) finally: self.writers -= 1 + self.waiting_writes_metric.dec() if not self.writers: self.read_ready.set() diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index a454e3a30..dc353b50e 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -33,13 +33,12 @@ from asyncio import Event, CancelledError import logging import time from contextlib import suppress - +from prometheus_client import Counter, Histogram from lbry.wallet.tasks import TaskGroup from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Notification from .jsonrpc import RPCError, ProtocolError from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer -from lbry.wallet.server import prometheus class Connector: @@ -372,10 +371,26 @@ class BatchRequest: raise BatchError(self) +NAMESPACE = "wallet_server" + + class RPCSession(SessionBase): """Base class for protocols where a message can lead to a response, for example JSON RPC.""" + RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, + labelnames=("method", "version")) + NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", + namespace=NAMESPACE, labelnames=("method", "version")) + REQUEST_ERRORS_COUNT = Counter( + "request_error", "Number of requests that returned errors", namespace=NAMESPACE, + labelnames=("method", "version") + ) + RESET_CONNECTIONS = Counter( + "reset_clients", "Number of reset connections by client version", + namespace=NAMESPACE, labelnames=("version",) + ) + def __init__(self, *, framer=None, loop=None, connection=None): super().__init__(framer=framer, loop=loop) self.connection = connection or self.default_connection() @@ -388,7 +403,7 @@ class RPCSession(SessionBase): except MemoryError: self.logger.warning('received oversized message from %s:%s, dropping connection', self._address[0], self._address[1]) - prometheus.METRICS.RESET_CONNECTIONS.labels(version=self.client_version).inc() + self.RESET_CONNECTIONS.labels(version=self.client_version).inc() self._close() return @@ -422,7 +437,7 @@ class RPCSession(SessionBase): 'internal server error') if isinstance(request, Request): message = request.send_result(result) - prometheus.METRICS.RESPONSE_TIMES.labels( + self.RESPONSE_TIMES.labels( method=request.method, version=self.client_version ).observe(time.perf_counter() - start) @@ -430,7 +445,7 @@ class RPCSession(SessionBase): await self._send_message(message) if isinstance(result, Exception): self._bump_errors() - prometheus.METRICS.REQUEST_ERRORS_COUNT.labels( + self.REQUEST_ERRORS_COUNT.labels( method=request.method, version=self.client_version ).inc() @@ -467,7 +482,7 @@ class RPCSession(SessionBase): async def send_notification(self, method, args=()): """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) - prometheus.METRICS.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() + self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() await self._send_message(message) def send_batch(self, raise_errors=False): diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 48a4519d0..cb6a32f55 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -3,6 +3,7 @@ import asyncio from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional +from prometheus_client import Gauge, Histogram import lbry from lbry.schema.claim import Claim from lbry.wallet.server.db.writer import SQLDB @@ -10,7 +11,6 @@ from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.leveldb import FlushData -from lbry.wallet.server import prometheus class Prefetcher: @@ -129,6 +129,9 @@ class ChainError(Exception): """Raised on error processing blocks.""" +NAMESPACE = "wallet_server" + + class BlockProcessor: """Process blocks and update the DB state to match. @@ -136,6 +139,14 @@ class BlockProcessor: Coordinate backing up in case of chain reorganisations. """ + block_count_metric = Gauge( + "block_count", "Number of processed blocks", namespace=NAMESPACE + ) + block_update_time_metric = Histogram("block_time", "Block update times", namespace=NAMESPACE) + reorg_count_metric = Gauge( + "reorg_count", "Number of reorgs", namespace=NAMESPACE + ) + def __init__(self, env, db, daemon, notifications): self.env = env self.db = db @@ -199,8 +210,8 @@ class BlockProcessor: cache.clear() await self._maybe_flush() processed_time = time.perf_counter() - start - prometheus.METRICS.BLOCK_COUNT.set(self.height) - prometheus.METRICS.BLOCK_UPDATE_TIMES.observe(processed_time) + self.block_count_metric.set(self.height) + self.block_update_time_metric.observe(processed_time) if not self.db.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) @@ -255,7 +266,7 @@ class BlockProcessor: last -= len(raw_blocks) await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) await self.prefetcher.reset_height(self.height) - prometheus.METRICS.REORG_COUNT.inc() + self.reorg_count_metric.inc() async def reorg_hashes(self, count): """Return a pair (start, last, hashes) of blocks to back up during a diff --git a/lbry/wallet/server/daemon.py b/lbry/wallet/server/daemon.py index 39487f367..44a366b6a 100644 --- a/lbry/wallet/server/daemon.py +++ b/lbry/wallet/server/daemon.py @@ -6,11 +6,11 @@ from functools import wraps from pylru import lrucache import aiohttp +from prometheus_client import Gauge, Histogram from lbry.wallet.rpc.jsonrpc import RPCError from lbry.wallet.server.util import hex_to_bytes, class_logger from lbry.wallet.rpc import JSONRPC -from lbry.wallet.server import prometheus class DaemonError(Exception): @@ -25,12 +25,23 @@ class WorkQueueFullError(Exception): """Internal - when the daemon's work queue is full.""" +NAMESPACE = "wallet_server" + + class Daemon: """Handles connections to a daemon at the given URL.""" WARMING_UP = -28 id_counter = itertools.count() + lbrycrd_request_time_metric = Histogram( + "lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",) + ) + lbrycrd_pending_count_metric = Gauge( + "lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE, + labelnames=("method",) + ) + def __init__(self, coin, url, max_workqueue=10, init_retry=0.25, max_retry=4.0): self.coin = coin @@ -130,7 +141,7 @@ class Daemon: while True: try: for method in methods: - prometheus.METRICS.LBRYCRD_PENDING_COUNT.labels(method=method).inc() + self.lbrycrd_pending_count_metric.labels(method=method).inc() result = await self._send_data(data) result = processor(result) if on_good_message: @@ -155,7 +166,7 @@ class Daemon: on_good_message = 'running normally' finally: for method in methods: - prometheus.METRICS.LBRYCRD_PENDING_COUNT.labels(method=method).dec() + self.lbrycrd_pending_count_metric.labels(method=method).dec() await asyncio.sleep(retry) retry = max(min(self.max_retry, retry * 2), self.init_retry) @@ -176,7 +187,7 @@ class Daemon: if params: payload['params'] = params result = await self._send(payload, processor) - prometheus.METRICS.LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start) + self.lbrycrd_request_time_metric.labels(method=method).observe(time.perf_counter() - start) return result async def _send_vector(self, method, params_iterable, replace_errs=False): @@ -201,7 +212,7 @@ class Daemon: result = [] if payload: result = await self._send(payload, processor) - prometheus.METRICS.LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start) + self.lbrycrd_request_time_metric.labels(method=method).observe(time.perf_counter() - start) return result async def _is_rpc_available(self, method): diff --git a/lbry/wallet/server/prometheus.py b/lbry/wallet/server/prometheus.py deleted file mode 100644 index ee3e268cf..000000000 --- a/lbry/wallet/server/prometheus.py +++ /dev/null @@ -1,125 +0,0 @@ -import os -from prometheus_client import Counter, Info, Histogram, Gauge -from lbry import __version__ as version -from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG -from lbry.wallet.server import util -import lbry.wallet.server.version as wallet_server_version - - -class PrometheusMetrics: - VERSION_INFO: Info - SESSIONS_COUNT: Gauge - REQUESTS_COUNT: Counter - RESPONSE_TIMES: Histogram - NOTIFICATION_COUNT: Counter - REQUEST_ERRORS_COUNT: Counter - SQLITE_INTERRUPT_COUNT: Counter - SQLITE_OPERATIONAL_ERROR_COUNT: Counter - SQLITE_INTERNAL_ERROR_COUNT: Counter - SQLITE_EXECUTOR_TIMES: Histogram - SQLITE_PENDING_COUNT: Gauge - LBRYCRD_REQUEST_TIMES: Histogram - LBRYCRD_PENDING_COUNT: Gauge - CLIENT_VERSIONS: Counter - BLOCK_COUNT: Gauge - BLOCK_UPDATE_TIMES: Histogram - REORG_COUNT: Gauge - RESET_CONNECTIONS: Counter - - __slots__ = [ - 'VERSION_INFO', - 'SESSIONS_COUNT', - 'REQUESTS_COUNT', - 'RESPONSE_TIMES', - 'NOTIFICATION_COUNT', - 'REQUEST_ERRORS_COUNT', - 'SQLITE_INTERRUPT_COUNT', - 'SQLITE_OPERATIONAL_ERROR_COUNT', - 'SQLITE_INTERNAL_ERROR_COUNT', - 'SQLITE_EXECUTOR_TIMES', - 'SQLITE_PENDING_COUNT', - 'LBRYCRD_REQUEST_TIMES', - 'LBRYCRD_PENDING_COUNT', - 'CLIENT_VERSIONS', - 'BLOCK_COUNT', - 'BLOCK_UPDATE_TIMES', - 'REORG_COUNT', - 'RESET_CONNECTIONS', - '_installed', - 'namespace', - 'cpu_count' - ] - - def __init__(self): - self._installed = False - self.namespace = "wallet_server" - self.cpu_count = f"{os.cpu_count()}" - - def uninstall(self): - self._installed = False - for item in self.__slots__: - if not item.startswith('_') and item not in ('namespace', 'cpu_count'): - current = getattr(self, item, None) - if current: - setattr(self, item, None) - del current - - def install(self): - if self._installed: - return - self._installed = True - self.VERSION_INFO = Info('build', 'Wallet server build info (e.g. version, commit hash)', namespace=self.namespace) - self.VERSION_INFO.info({ - 'build': BUILD, - "commit": COMMIT_HASH, - "docker_tag": DOCKER_TAG, - 'version': version, - "min_version": util.version_string(wallet_server_version.PROTOCOL_MIN), - "cpu_count": self.cpu_count - }) - self.SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=self.namespace, - labelnames=("version",)) - self.REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=self.namespace, - labelnames=("method", "version")) - self.RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=self.namespace, - labelnames=("method", "version")) - self.NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", - namespace=self.namespace, labelnames=("method", "version")) - self.REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=self.namespace, - labelnames=("method", "version")) - self.SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=self.namespace) - self.SQLITE_OPERATIONAL_ERROR_COUNT = Counter( - "operational_error", "Number of queries that raised operational errors", namespace=self.namespace - ) - self.SQLITE_INTERNAL_ERROR_COUNT = Counter( - "internal_error", "Number of queries raising unexpected errors", namespace=self.namespace - ) - self.SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=self.namespace) - self.SQLITE_PENDING_COUNT = Gauge( - "pending_queries_count", "Number of pending and running sqlite queries", namespace=self.namespace - ) - self.LBRYCRD_REQUEST_TIMES = Histogram( - "lbrycrd_request", "lbrycrd requests count", namespace=self.namespace, labelnames=("method",) - ) - self.LBRYCRD_PENDING_COUNT = Gauge( - "lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=self.namespace, - labelnames=("method",) - ) - self.CLIENT_VERSIONS = Counter( - "clients", "Number of connections received per client version", - namespace=self.namespace, labelnames=("version",) - ) - self.BLOCK_COUNT = Gauge( - "block_count", "Number of processed blocks", namespace=self.namespace - ) - self.BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=self.namespace) - self.REORG_COUNT = Gauge( - "reorg_count", "Number of reorgs", namespace=self.namespace - ) - self.RESET_CONNECTIONS = Counter( - "reset_clients", "Number of reset connections by client version", - namespace=self.namespace, labelnames=("version",) - ) - - -METRICS = PrometheusMetrics() diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index 98000ace9..cca84c852 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -7,7 +7,6 @@ import typing import lbry from lbry.wallet.server.mempool import MemPool, MemPoolAPI from lbry.prometheus import PrometheusServer -from lbry.wallet.server.prometheus import METRICS class Notifications: @@ -93,7 +92,6 @@ class Server: ) async def start(self): - METRICS.install() env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() self.log.info(f'software version: {lbry.__version__}') @@ -123,7 +121,6 @@ class Server: self.prometheus_server = None self.shutdown_event.set() await self.daemon.close() - METRICS.uninstall() def run(self): loop = asyncio.get_event_loop() diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 0dccce853..1cce96ff9 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -20,14 +20,15 @@ from functools import partial from binascii import hexlify from pylru import lrucache from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from prometheus_client import Counter, Info, Histogram, Gauge import lbry +from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.db.writer import LBRYLevelDB from lbry.wallet.server.db import reader from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics -from lbry.wallet.server import prometheus from lbry.wallet.rpc.framing import NewlineFramer import lbry.wallet.server.version as VERSION @@ -117,9 +118,45 @@ class SessionGroup: self.semaphore = asyncio.Semaphore(20) +NAMESPACE = "wallet_server" + + class SessionManager: """Holds global state about all sessions.""" + version_info_metric = Info( + 'build', 'Wallet server build info (e.g. version, commit hash)', namespace=NAMESPACE + ) + version_info_metric.info({ + 'build': BUILD, + "commit": COMMIT_HASH, + "docker_tag": DOCKER_TAG, + 'version': lbry.__version__, + "min_version": util.version_string(VERSION.PROTOCOL_MIN), + "cpu_count": os.cpu_count() + }) + session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, + labelnames=("version",)) + request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, + labelnames=("method", "version")) + + interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) + db_operational_error_metric = Counter( + "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE + ) + db_error_metric = Counter( + "internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE + ) + executor_time_metric = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE) + pending_query_metric = Gauge( + "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE + ) + + client_version_metric = Counter( + "clients", "Number of connections received per client version", + namespace=NAMESPACE, labelnames=("version",) + ) + def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', shutdown_event: asyncio.Event): env.max_send = max(350000, env.max_send) @@ -675,7 +712,7 @@ class SessionBase(RPCSession): context = {'conn_id': f'{self.session_id}'} self.logger = util.ConnectionLogger(self.logger, context) self.group = self.session_mgr.add_session(self) - prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).inc() + self.session_mgr.session_count_metric.labels(version=self.client_version).inc() peer_addr_str = self.peer_address_str() self.logger.info(f'{self.kind} {peer_addr_str}, ' f'{self.session_mgr.session_count():,d} total') @@ -684,7 +721,7 @@ class SessionBase(RPCSession): """Handle client disconnection.""" super().connection_lost(exc) self.session_mgr.remove_session(self) - prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).dec() + self.session_mgr.session_count_metric.labels(version=self.client_version).dec() msg = '' if not self._can_send.is_set(): msg += ' whilst paused' @@ -708,7 +745,7 @@ class SessionBase(RPCSession): """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. """ - prometheus.METRICS.REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc() + self.session_mgr.request_count_metric.labels(method=request.method, version=self.client_version).inc() if isinstance(request, Request): handler = self.request_handlers.get(request.method) handler = partial(handler, self) @@ -944,7 +981,7 @@ class LBRYElectrumX(SessionBase): async def run_in_executor(self, query_name, func, kwargs): start = time.perf_counter() try: - prometheus.METRICS.SQLITE_PENDING_COUNT.inc() + self.session_mgr.pending_query_metric.inc() result = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, func, kwargs ) @@ -953,18 +990,18 @@ class LBRYElectrumX(SessionBase): except reader.SQLiteInterruptedError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_interrupt(start, error.metrics) - prometheus.METRICS.prometheus.METRICS.SQLITE_INTERRUPT_COUNT.inc() + self.session_mgr.self.session_mgr.SQLITE_INTERRUPT_COUNT.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') except reader.SQLiteOperationalError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, error.metrics) - prometheus.METRICS.SQLITE_OPERATIONAL_ERROR_COUNT.inc() + self.session_mgr.db_operational_error_metric.inc() raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') except Exception: log.exception("dear devs, please handle this exception better") metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, {}) - prometheus.METRICS.SQLITE_INTERNAL_ERROR_COUNT.inc() + self.session_mgr.db_error_metric.inc() raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') else: if self.env.track_metrics: @@ -973,8 +1010,8 @@ class LBRYElectrumX(SessionBase): metrics.query_response(start, metrics_data) return base64.b64encode(result).decode() finally: - prometheus.METRICS.SQLITE_PENDING_COUNT.dec() - prometheus.METRICS.SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start) + self.session_mgr.pending_query_metric.dec() + self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) async def run_and_cache_query(self, query_name, function, kwargs): metrics = self.get_metrics_or_placeholder_for_api(query_name) @@ -1441,10 +1478,10 @@ class LBRYElectrumX(SessionBase): raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}') if self.client_version != client_name[:17]: - prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).dec() + self.session_mgr.session_count_metric.labels(version=self.client_version).dec() self.client_version = client_name[:17] - prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).inc() - prometheus.METRICS.CLIENT_VERSIONS.labels(version=self.client_version).inc() + self.session_mgr.session_count_metric.labels(version=self.client_version).inc() + self.session_mgr.client_version_metric.labels(version=self.client_version).inc() # Find the highest common protocol version. Disconnect if # that protocol version in unsupported. diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 5764c49c0..af9349e67 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -2,7 +2,6 @@ import logging import asyncio from binascii import hexlify from lbry.testcase import CommandTestCase -from lbry.wallet.server import prometheus class BlockchainReorganizationTests(CommandTestCase): @@ -16,7 +15,8 @@ class BlockchainReorganizationTests(CommandTestCase): ) async def test_reorg(self): - prometheus.METRICS.REORG_COUNT.set(0) + bp = self.conductor.spv_node.server.bp + bp.reorg_count_metric.set(0) # invalidate current block, move forward 2 self.assertEqual(self.ledger.headers.height, 206) await self.assertBlockHash(206) @@ -26,7 +26,7 @@ class BlockchainReorganizationTests(CommandTestCase): self.assertEqual(self.ledger.headers.height, 207) await self.assertBlockHash(206) await self.assertBlockHash(207) - self.assertEqual(1, prometheus.METRICS.REORG_COUNT._samples()[0][2]) + self.assertEqual(1, bp.reorg_count_metric._samples()[0][2]) # invalidate current block, move forward 3 await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) @@ -36,7 +36,7 @@ class BlockchainReorganizationTests(CommandTestCase): await self.assertBlockHash(206) await self.assertBlockHash(207) await self.assertBlockHash(208) - self.assertEqual(2, prometheus.METRICS.REORG_COUNT._samples()[0][2]) + self.assertEqual(2, bp.reorg_count_metric._samples()[0][2]) async def test_reorg_change_claim_height(self): # sanity check From 3469abaefdcc1c7f9cd03b6a34f05727121e36e6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 2 May 2020 21:23:17 -0400 Subject: [PATCH 5/9] write lock metrics --- lbry/utils.py | 24 ++++++++++++++++++++++ lbry/wallet/database.py | 44 ++++++++++++++++++++++++++++++++++------- 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/lbry/utils.py b/lbry/utils.py index c24d8a971..4aa1215e3 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -3,6 +3,7 @@ import codecs import datetime import random import socket +import time import string import sys import json @@ -19,6 +20,7 @@ import pkg_resources import certifi import aiohttp +from prometheus_client import Histogram from lbry.schema.claim import Claim log = logging.getLogger(__name__) @@ -282,3 +284,25 @@ async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled def is_running_from_bundle(): # see https://pyinstaller.readthedocs.io/en/stable/runtime-information.html return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS') + + +class LockWithMetrics(asyncio.Lock): + def __init__(self, acquire_metric, held_time_metric, loop=None): + super().__init__(loop=loop) + self._acquire_metric = acquire_metric + self._lock_held_time_metric = held_time_metric + self._lock_acquired_time = None + + async def acquire(self): + start = time.perf_counter() + try: + return await super().acquire() + finally: + self._lock_acquired_time = time.perf_counter() + self._acquire_metric.observe(self._lock_acquired_time - start) + + def release(self): + try: + return super().release() + finally: + self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 55f45ff63..8a39ee22a 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -3,7 +3,6 @@ import logging import asyncio import sqlite3 import platform -import time from binascii import hexlify from dataclasses import dataclass from contextvars import ContextVar @@ -11,7 +10,8 @@ from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.process import ProcessPoolExecutor from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional from datetime import date -from prometheus_client import Gauge +from prometheus_client import Gauge, Counter, Histogram +from lbry.utils import LockWithMetrics from .bip32 import PubKey from .transaction import Transaction, Output, OutputScript, TXRefImmutable @@ -72,6 +72,18 @@ class AIOSQLite: waiting_reads_metric = Gauge( "waiting_reads_count", "Number of waiting db writes", namespace="daemon_database" ) + write_count_metric = Counter( + "write_count", "Number of database writes", namespace="daemon_database" + ) + read_count_metric = Counter( + "read_count", "Number of database reads", namespace="daemon_database" + ) + acquire_write_lock_metric = Histogram( + f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database" + ) + held_write_lock_metric = Histogram( + f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database" + ) def __init__(self): # has to be single threaded as there is no mapping of thread:connection @@ -79,7 +91,7 @@ class AIOSQLite: self.writer_connection: Optional[sqlite3.Connection] = None self._closing = False self.query_count = 0 - self.write_lock = asyncio.Lock() + self.write_lock = LockWithMetrics(self.acquire_write_lock_metric, self.held_write_lock_metric) self.writers = 0 self.read_ready = asyncio.Event() self.urgent_read_done = asyncio.Event() @@ -127,6 +139,7 @@ class AIOSQLite: urgent_read = False if read_only: self.waiting_reads_metric.inc() + self.read_count_metric.inc() try: while self.writers: # more writes can come in while we are waiting for the first if not urgent_read and still_waiting and self.urgent_read_done.is_set(): @@ -161,6 +174,7 @@ class AIOSQLite: return self.run(lambda conn: conn.execute(sql, parameters)) async def run(self, fun, *args, **kwargs): + self.write_count_metric.inc() self.waiting_writes_metric.inc() try: await self.urgent_read_done.wait() @@ -193,10 +207,26 @@ class AIOSQLite: log.warning("rolled back") raise - def run_with_foreign_keys_disabled(self, fun, *args, **kwargs) -> Awaitable: - return asyncio.get_event_loop().run_in_executor( - self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs - ) + async def run_with_foreign_keys_disabled(self, fun, *args, **kwargs): + self.write_count_metric.inc() + self.waiting_writes_metric.inc() + try: + await self.urgent_read_done.wait() + except Exception as e: + self.waiting_writes_metric.dec() + raise e + self.writers += 1 + self.read_ready.clear() + try: + async with self.write_lock: + return await asyncio.get_event_loop().run_in_executor( + self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs + ) + finally: + self.writers -= 1 + self.waiting_writes_metric.dec() + if not self.writers: + self.read_ready.set() def __run_transaction_with_foreign_keys_disabled(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], From 87f751188e12413e0d7c34ec29008df475cd2bff Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 2 May 2020 21:58:41 -0400 Subject: [PATCH 6/9] cancelled and failed api request metrics --- lbry/extras/daemon/daemon.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 94186abc0..e27cd32cd 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -306,6 +306,14 @@ class Daemon(metaclass=JSONRPCServerType): "requests_count", "Number of requests received", namespace="daemon_api", labelnames=("method",) ) + failed_request_metric = Counter( + "failed_request_count", "Number of failed requests", namespace="daemon_api", + labelnames=("method",) + ) + cancelled_request_metric = Counter( + "cancelled_request_count", "Number of cancelled requests", namespace="daemon_api", + labelnames=("method",) + ) response_time_metric = Histogram( "response_time", "Response times", namespace="daemon_api", labelnames=("method",) @@ -685,9 +693,11 @@ class Daemon(metaclass=JSONRPCServerType): result = await result return result except asyncio.CancelledError: + self.cancelled_request_metric.labels(method=function_name).inc() log.info("cancelled API call for: %s", function_name) raise except Exception as e: # pylint: disable=broad-except + self.failed_request_metric.labels(method=function_name).inc() log.exception("error handling api request") return JSONRPCError.create_command_exception( command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc() From d3ffae72fb529cc7d581797024ecf773d7f79fa5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 2 May 2020 22:30:25 -0400 Subject: [PATCH 7/9] buckets --- lbry/extras/daemon/daemon.py | 7 ++++++- lbry/wallet/database.py | 10 +++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index e27cd32cd..5c84f8227 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -290,6 +290,11 @@ class JSONRPCServerType(type): return klass +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + + class Daemon(metaclass=JSONRPCServerType): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -315,7 +320,7 @@ class Daemon(metaclass=JSONRPCServerType): labelnames=("method",) ) response_time_metric = Histogram( - "response_time", "Response times", namespace="daemon_api", + "response_time", "Response times", namespace="daemon_api", buckets=HISTOGRAM_BUCKETS, labelnames=("method",) ) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 8a39ee22a..4f92ad445 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -22,6 +22,10 @@ from .util import date_to_julian_day log = logging.getLogger(__name__) sqlite3.enable_callback_tracebacks(True) +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + @dataclass class ReaderProcessState: @@ -79,10 +83,10 @@ class AIOSQLite: "read_count", "Number of database reads", namespace="daemon_database" ) acquire_write_lock_metric = Histogram( - f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database" + f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database", buckets=HISTOGRAM_BUCKETS ) held_write_lock_metric = Histogram( - f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database" + f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database", buckets=HISTOGRAM_BUCKETS ) def __init__(self): @@ -642,7 +646,7 @@ class Database(SQLiteMixin): return self.db.run(__many) async def reserve_outputs(self, txos, is_reserved=True): - txoids = ((is_reserved, txo.id) for txo in txos) + txoids = [(is_reserved, txo.id) for txo in txos] await self.db.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?", txoids) async def release_outputs(self, txos): From e3abab6d4d28e924dbdddeb03c1bdc643391062a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 4 May 2020 12:09:09 -0400 Subject: [PATCH 8/9] pylint --- lbry/utils.py | 1 - lbry/wallet/database.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/utils.py b/lbry/utils.py index 4aa1215e3..0db443cd9 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -20,7 +20,6 @@ import pkg_resources import certifi import aiohttp -from prometheus_client import Histogram from lbry.schema.claim import Claim log = logging.getLogger(__name__) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 4f92ad445..31d078cec 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -86,7 +86,8 @@ class AIOSQLite: f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database", buckets=HISTOGRAM_BUCKETS ) held_write_lock_metric = Histogram( - f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database", buckets=HISTOGRAM_BUCKETS + f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database", + buckets=HISTOGRAM_BUCKETS ) def __init__(self): From 750ff448ad3d2b7b8528d8d8a3a3d343e95a0d96 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 4 May 2020 13:47:37 -0400 Subject: [PATCH 9/9] comments --- lbry/wallet/database.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 31d078cec..9c8ad7695 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -181,12 +181,18 @@ class AIOSQLite: async def run(self, fun, *args, **kwargs): self.write_count_metric.inc() self.waiting_writes_metric.inc() + # it's possible many writes are coming in one after the other, these can + # block reader calls for a long time + # if the reader waits for the writers to finish and then has to wait for + # yet more, it will clear the urgent_read_done event to block more writers + # piling on try: await self.urgent_read_done.wait() except Exception as e: self.waiting_writes_metric.dec() raise e self.writers += 1 + # block readers self.read_ready.clear() try: async with self.write_lock: @@ -197,6 +203,7 @@ class AIOSQLite: self.writers -= 1 self.waiting_writes_metric.dec() if not self.writers: + # unblock the readers once the last enqueued writer finishes self.read_ready.set() def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs):