diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index e4ce4a8a8..d14a8964b 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -40,6 +40,7 @@ from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Noti from .jsonrpc import RPCError, ProtocolError from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer from .util import Concurrency +from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT class Connector: @@ -405,6 +406,7 @@ class RPCSession(SessionBase): await self._task_group.add(self._handle_request(request)) async def _handle_request(self, request): + start = time.perf_counter() try: result = await self.handle_request(request) except (ProtocolError, RPCError) as e: @@ -417,10 +419,12 @@ class RPCSession(SessionBase): 'internal server error') if isinstance(request, Request): message = request.send_result(result) + RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start) if message: await self._send_message(message) if isinstance(result, Exception): self._bump_errors() + REQUEST_ERRORS_COUNT.labels(method=request.method).inc() def connection_lost(self, exc): # Cancel pending requests and message processing @@ -455,6 +459,7 @@ class RPCSession(SessionBase): """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) await self._send_message(message) + NOTIFICATION_COUNT.labels(method=method).inc() def send_batch(self, raise_errors=False): """Return a BatchRequest. Intended to be used like so: diff --git a/lbry/wallet/server/daemon.py b/lbry/wallet/server/daemon.py index 90600af8d..28c0d686f 100644 --- a/lbry/wallet/server/daemon.py +++ b/lbry/wallet/server/daemon.py @@ -9,7 +9,7 @@ import aiohttp from lbry.wallet.rpc.jsonrpc import RPCError from lbry.wallet.server.util import hex_to_bytes, class_logger from lbry.wallet.rpc import JSONRPC - +from lbry.wallet.server.prometheus import LBRYCRD_REQUEST_TIMES, LBRYCRD_PENDING_COUNT class DaemonError(Exception): """Raised when the daemon returns an error in its results.""" @@ -106,6 +106,7 @@ class Daemon: Handles temporary connection issues. Daemon response errors are raise through DaemonError. """ + def log_error(error): nonlocal last_error_log, retry now = time.time() @@ -119,8 +120,13 @@ class Daemon: last_error_log = 0 data = json.dumps(payload) retry = self.init_retry + methods = tuple( + [payload['method']] if isinstance(payload, dict) else [request['method'] for request in payload] + ) while True: try: + for method in methods: + LBRYCRD_PENDING_COUNT.labels(method=method).inc() result = await self._send_data(data) result = processor(result) if on_good_message: @@ -143,12 +149,17 @@ class Daemon: except WorkQueueFullError: log_error('work queue full.') on_good_message = 'running normally' - + finally: + for method in methods: + LBRYCRD_PENDING_COUNT.labels(method=method).dec() await asyncio.sleep(retry) retry = max(min(self.max_retry, retry * 2), self.init_retry) async def _send_single(self, method, params=None): """Send a single request to the daemon.""" + + start = time.perf_counter() + def processor(result): err = result['error'] if not err: @@ -160,7 +171,9 @@ class Daemon: payload = {'method': method, 'id': next(self.id_counter)} if params: payload['params'] = params - return await self._send(payload, processor) + result = await self._send(payload, processor) + LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start) + return result async def _send_vector(self, method, params_iterable, replace_errs=False): """Send several requests of the same method. @@ -168,6 +181,9 @@ class Daemon: The result will be an array of the same length as params_iterable. If replace_errs is true, any item with an error is returned as None, otherwise an exception is raised.""" + + start = time.perf_counter() + def processor(result): errs = [item['error'] for item in result if item['error']] if any(err.get('code') == self.WARMING_UP for err in errs): @@ -178,9 +194,11 @@ class Daemon: payload = [{'method': method, 'params': p, 'id': next(self.id_counter)} for p in params_iterable] + result = [] if payload: - return await self._send(payload, processor) - return [] + result = await self._send(payload, processor) + LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start) + return result async def _is_rpc_available(self, method): """Return whether given RPC method is available in the daemon. diff --git a/lbry/wallet/server/prometheus.py b/lbry/wallet/server/prometheus.py index 72b41fc89..c36a565f3 100644 --- a/lbry/wallet/server/prometheus.py +++ b/lbry/wallet/server/prometheus.py @@ -1,5 +1,5 @@ from aiohttp import web -from prometheus_client import Counter, Info, generate_latest as prom_generate_latest +from prometheus_client import Counter, Info, generate_latest as prom_generate_latest, Histogram, Gauge from lbry import __version__ as version from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.wallet.server import util @@ -15,7 +15,36 @@ VERSION_INFO.info({ 'version': version, "min_version": util.version_string(wallet_server_version.PROTOCOL_MIN), }) -REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, labelnames=("method",)) +SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE) + +REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, + labelnames=("method",)) +RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method",)) +NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", + namespace=NAMESPACE, labelnames=("method",)) +REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE, + labelnames=("method",)) +SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) +SQLITE_OPERATIONAL_ERROR_COUNT = Counter( + "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE +) +SQLITE_INTERNAL_ERROR_COUNT = Counter( + "internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE +) +SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE) +SQLITE_PENDING_COUNT = Gauge( + "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE +) +LBRYCRD_REQUEST_TIMES = Histogram( + "lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",) +) +LBRYCRD_PENDING_COUNT = Gauge( + "lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE, labelnames=("method",) +) +CLIENT_VERSIONS = Counter( + "clients", "Number of connections received per client version", + namespace=NAMESPACE, labelnames=("version",) +) class PrometheusServer: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 826ed38cc..206c4d872 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -27,7 +27,9 @@ from lbry.wallet.server.db.writer import LBRYLevelDB from lbry.wallet.server.db import reader from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics -from lbry.wallet.server.prometheus import REQUESTS_COUNT +from lbry.wallet.server.prometheus import REQUESTS_COUNT, SQLITE_INTERRUPT_COUNT, SQLITE_INTERNAL_ERROR_COUNT +from lbry.wallet.server.prometheus import SQLITE_OPERATIONAL_ERROR_COUNT, SQLITE_EXECUTOR_TIMES, SESSIONS_COUNT +from lbry.wallet.server.prometheus import SQLITE_PENDING_COUNT, CLIENT_VERSIONS import lbry.wallet.server.version as VERSION from lbry.wallet.rpc import ( @@ -142,13 +144,6 @@ class SessionManager: self.session_event = Event() - # Set up the RPC request handlers - cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' - 'query reorg sessions stop'.split()) - LocalRPC.request_handlers.update( - {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} - ) - async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() if kind == 'RPC': @@ -679,6 +674,7 @@ class SessionBase(RPCSession): context = {'conn_id': f'{self.session_id}'} self.logger = util.ConnectionLogger(self.logger, context) self.group = self.session_mgr.add_session(self) + SESSIONS_COUNT.inc() self.logger.info(f'{self.kind} {self.peer_address_str()}, ' f'{self.session_mgr.session_count():,d} total') @@ -686,6 +682,7 @@ class SessionBase(RPCSession): """Handle client disconnection.""" super().connection_lost(exc) self.session_mgr.remove_session(self) + SESSIONS_COUNT.dec() msg = '' if not self._can_send.is_set(): msg += ' whilst paused' @@ -712,6 +709,7 @@ class SessionBase(RPCSession): REQUESTS_COUNT.labels(method=request.method).inc() if isinstance(request, Request): handler = self.request_handlers.get(request.method) + handler = partial(handler, self) else: handler = None coro = handler_invocation(handler, request)() @@ -785,16 +783,56 @@ class LBRYElectrumX(SessionBase): session_mgr: LBRYSessionManager version = lbry.__version__ + @classmethod + def initialize_request_handlers(cls): + cls.request_handlers.update({ + 'blockchain.block.get_chunk': cls.block_get_chunk, + 'blockchain.block.get_header': cls.block_get_header, + 'blockchain.estimatefee': cls.estimatefee, + 'blockchain.relayfee': cls.relayfee, + 'blockchain.scripthash.get_balance': cls.scripthash_get_balance, + 'blockchain.scripthash.get_history': cls.scripthash_get_history, + 'blockchain.scripthash.get_mempool': cls.scripthash_get_mempool, + 'blockchain.scripthash.listunspent': cls.scripthash_listunspent, + 'blockchain.scripthash.subscribe': cls.scripthash_subscribe, + 'blockchain.transaction.broadcast': cls.transaction_broadcast, + 'blockchain.transaction.get': cls.transaction_get, + 'blockchain.transaction.get_merkle': cls.transaction_merkle, + 'server.add_peer': cls.add_peer, + 'server.banner': cls.banner, + 'server.donation_address': cls.donation_address, + 'server.features': cls.server_features_async, + 'server.peers.subscribe': cls.peers_subscribe, + 'server.version': cls.server_version, + 'blockchain.transaction.get_height': cls.transaction_get_height, + 'blockchain.claimtrie.search': cls.claimtrie_search, + 'blockchain.claimtrie.resolve': cls.claimtrie_resolve, + 'blockchain.claimtrie.getclaimsbyids': cls.claimtrie_getclaimsbyids, + 'blockchain.block.get_server_height': cls.get_server_height, + 'mempool.get_fee_histogram': cls.mempool_compact_histogram, + 'blockchain.block.headers': cls.block_headers, + 'server.ping': cls.ping, + 'blockchain.headers.subscribe': cls.headers_subscribe_False, + 'blockchain.address.get_balance': cls.address_get_balance, + 'blockchain.address.get_history': cls.address_get_history, + 'blockchain.address.get_mempool': cls.address_get_mempool, + 'blockchain.address.listunspent': cls.address_listunspent, + 'blockchain.address.subscribe': cls.address_subscribe, + 'blockchain.address.unsubscribe': cls.address_unsubscribe, + }) + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + if not LBRYElectrumX.request_handlers: + LBRYElectrumX.initialize_request_handlers() self.subscribe_headers = False self.subscribe_headers_raw = False self.connection.max_response_size = self.env.max_send self.hashX_subs = {} self.sv_seen = False self.mempool_statuses = {} - self.set_request_handlers(self.PROTOCOL_MIN) - # fixme: this is a rebase hack, we need to go through ChainState instead later + self.protocol_tuple = self.PROTOCOL_MIN + self.daemon = self.session_mgr.daemon self.bp: LBRYBlockProcessor = self.session_mgr.bp self.db: LBRYLevelDB = self.bp.db @@ -888,6 +926,7 @@ class LBRYElectrumX(SessionBase): async def run_in_executor(self, query_name, func, kwargs): start = time.perf_counter() try: + SQLITE_PENDING_COUNT.inc() result = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, func, kwargs ) @@ -896,23 +935,28 @@ class LBRYElectrumX(SessionBase): except reader.SQLiteInterruptedError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_interrupt(start, error.metrics) + SQLITE_INTERRUPT_COUNT.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') except reader.SQLiteOperationalError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, error.metrics) + SQLITE_OPERATIONAL_ERROR_COUNT.inc() raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') except Exception: log.exception("dear devs, please handle this exception better") metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, {}) + SQLITE_INTERNAL_ERROR_COUNT.inc() raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') - - if self.env.track_metrics: - metrics = self.get_metrics_or_placeholder_for_api(query_name) - (result, metrics_data) = result - metrics.query_response(start, metrics_data) - - return base64.b64encode(result).decode() + else: + if self.env.track_metrics: + metrics = self.get_metrics_or_placeholder_for_api(query_name) + (result, metrics_data) = result + metrics.query_response(start, metrics_data) + return base64.b64encode(result).decode() + finally: + SQLITE_PENDING_COUNT.dec() + SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start) async def run_and_cache_query(self, query_name, function, kwargs): metrics = self.get_metrics_or_placeholder_for_api(query_name) @@ -935,6 +979,9 @@ class LBRYElectrumX(SessionBase): metrics.cache_response() return cache_item.result + async def mempool_compact_histogram(self): + return self.mempool.compact_fee_histogram() + async def claimtrie_search(self, **kwargs): if kwargs: return await self.run_and_cache_query('search', reader.search_to_bytes, kwargs) @@ -1370,6 +1417,7 @@ class LBRYElectrumX(SessionBase): raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}') self.client = client_name[:17] + CLIENT_VERSIONS.labels(version=str(client_name)).inc() # Find the highest common protocol version. Disconnect if # that protocol version in unsupported. @@ -1384,8 +1432,7 @@ class LBRYElectrumX(SessionBase): self.close_after_send = True raise RPCError(BAD_REQUEST, f'unsupported protocol version: {protocol_version}') - self.set_request_handlers(ptuple) - + self.protocol_tuple = ptuple return self.version, self.protocol_version_string() async def transaction_broadcast(self, raw_tx): @@ -1479,45 +1526,6 @@ class LBRYElectrumX(SessionBase): else: return tx_hash - def set_request_handlers(self, ptuple): - self.protocol_tuple = ptuple - handlers = { - 'blockchain.block.get_chunk': self.block_get_chunk, - 'blockchain.block.get_header': self.block_get_header, - 'blockchain.estimatefee': self.estimatefee, - 'blockchain.relayfee': self.relayfee, - 'blockchain.scripthash.get_balance': self.scripthash_get_balance, - 'blockchain.scripthash.get_history': self.scripthash_get_history, - 'blockchain.scripthash.get_mempool': self.scripthash_get_mempool, - 'blockchain.scripthash.listunspent': self.scripthash_listunspent, - 'blockchain.scripthash.subscribe': self.scripthash_subscribe, - 'blockchain.transaction.broadcast': self.transaction_broadcast, - 'blockchain.transaction.get': self.transaction_get, - 'blockchain.transaction.get_merkle': self.transaction_merkle, - 'server.add_peer': self.add_peer, - 'server.banner': self.banner, - 'server.donation_address': self.donation_address, - 'server.features': self.server_features_async, - 'server.peers.subscribe': self.peers_subscribe, - 'server.version': self.server_version, - 'blockchain.transaction.get_height': self.transaction_get_height, - 'blockchain.claimtrie.search': self.claimtrie_search, - 'blockchain.claimtrie.resolve': self.claimtrie_resolve, - 'blockchain.claimtrie.getclaimsbyids': self.claimtrie_getclaimsbyids, - 'blockchain.block.get_server_height': self.get_server_height, - 'mempool.get_fee_histogram': self.mempool.compact_fee_histogram, - 'blockchain.block.headers': self.block_headers, - 'server.ping': self.ping, - 'blockchain.headers.subscribe': self.headers_subscribe_False, - 'blockchain.address.get_balance': self.address_get_balance, - 'blockchain.address.get_history': self.address_get_history, - 'blockchain.address.get_mempool': self.address_get_mempool, - 'blockchain.address.listunspent': self.address_listunspent, - 'blockchain.address.subscribe': self.address_subscribe, - 'blockchain.address.unsubscribe': self.address_unsubscribe, - } - self.request_handlers = handlers - class LocalRPC(SessionBase): """A local TCP RPC server session."""