diff --git a/lbry/conf.py b/lbry/conf.py index 3fc7f2687..a72fd6510 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -502,6 +502,7 @@ class Config(CLIConfig): ) udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port']) tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port']) + prometheus_port = Integer("Port to expose prometheus metrics (off by default)", 0) network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0') # routing table diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index f1bc44aed..752bae4c4 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -17,6 +17,7 @@ from functools import wraps, partial import ecdsa import base58 from aiohttp import web +from prometheus_client import generate_latest as prom_generate_latest, CONTENT_TYPE_LATEST as PROM_CONTENT_TYPE_LATEST from google.protobuf.message import DecodeError from lbry.wallet import ( Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic, @@ -319,6 +320,10 @@ class Daemon(metaclass=JSONRPCServerType): streaming_app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request) self.streaming_runner = web.AppRunner(streaming_app) + prom_app = web.Application() + prom_app.router.add_get('/metrics', self.handle_metrics_get_request) + self.metrics_runner = web.AppRunner(prom_app) + @property def dht_node(self) -> typing.Optional['Node']: return self.component_manager.get_component(DHT_COMPONENT) @@ -446,6 +451,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.analytics_manager.send_server_startup() await self.rpc_runner.setup() await self.streaming_runner.setup() + await self.metrics_runner.setup() try: rpc_site = web.TCPSite(self.rpc_runner, self.conf.api_host, self.conf.api_port, shutdown_timeout=.5) @@ -467,6 +473,16 @@ class Daemon(metaclass=JSONRPCServerType): await self.analytics_manager.send_server_startup_error(str(e)) raise SystemExit() + if self.conf.prometheus_port: + try: + metrics_site = web.TCPSite(self.metrics_runner, "0.0.0.0", self.conf.prometheus_port, shutdown_timeout=.5) + await metrics_site.start() + log.info('metrics server listening on TCP %s:%i', *metrics_site._server.sockets[0].getsockname()[:2]) + except OSError as e: + log.error('metrics server failed to bind TCP :%i', self.conf.prometheus_port) + await self.analytics_manager.send_server_startup_error(str(e)) + raise SystemExit() + try: await self.initialize() except asyncio.CancelledError: @@ -498,6 +514,7 @@ class Daemon(metaclass=JSONRPCServerType): log.info("stopped api components") await self.rpc_runner.cleanup() await self.streaming_runner.cleanup() + await self.metrics_runner.cleanup() log.info("stopped api server") if self.analytics_manager.is_started: self.analytics_manager.stop() @@ -527,6 +544,16 @@ class Daemon(metaclass=JSONRPCServerType): content_type='application/json' ) + async def handle_metrics_get_request(self, request: web.Request): + try: + return web.Response( + text=prom_generate_latest().decode(), + content_type='text/plain; version=0.0.4' + ) + except Exception: + log.exception('could not generate prometheus data') + raise + async def handle_stream_get_request(self, request: web.Request): if not self.conf.streaming_get: log.warning("streaming_get is disabled, rejecting request") @@ -595,7 +622,7 @@ class Daemon(metaclass=JSONRPCServerType): # TODO: this is for backwards compatibility. Remove this once API and UI are updated # TODO: also delete EMPTY_PARAMS then _args, _kwargs = (), args[0] - elif isinstance(args, list) and len(args) == 2 and\ + elif isinstance(args, list) and len(args) == 2 and \ isinstance(args[0], list) and isinstance(args[1], dict): _args, _kwargs = args else: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 510ec882a..dcc885ee6 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -749,7 +749,7 @@ class LBRYBlockProcessor(BlockProcessor): timer = self.timer.sub_timers['advance_blocks'] undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs') timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True) - if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(20): + if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10): self.timer.show(height=height) return undo diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index fd1ca3c06..857b70803 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -59,6 +59,7 @@ class Env: self.ssl_certfile = self.required('SSL_CERTFILE') self.ssl_keyfile = self.required('SSL_KEYFILE') self.rpc_port = self.integer('RPC_PORT', 8000) + self.prometheus_port = self.integer('PROMETHEUS_PORT', 0) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) diff --git a/lbry/wallet/server/prometheus.py b/lbry/wallet/server/prometheus.py new file mode 100644 index 000000000..651350f12 --- /dev/null +++ b/lbry/wallet/server/prometheus.py @@ -0,0 +1,36 @@ +from aiohttp import web +from prometheus_client import Counter, generate_latest as prom_generate_latest +from lbry.wallet.server import util + +NAMESPACE = "wallet_server" + +REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE) + + +class PrometheusServer: + def __init__(self): + self.logger = util.class_logger(__name__, self.__class__.__name__) + self.runner = None + + async def start(self, port: int): + prom_app = web.Application() + prom_app.router.add_get('/metrics', self.handle_metrics_get_request) + self.runner = web.AppRunner(prom_app) + await self.runner.setup() + + metrics_site = web.TCPSite(self.runner, "0.0.0.0", port, shutdown_timeout=.5) + await metrics_site.start() + self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2]) + + async def handle_metrics_get_request(self, request: web.Request): + try: + return web.Response( + text=prom_generate_latest().decode(), + content_type='text/plain; version=0.0.4' + ) + except Exception: + self.logger.exception('could not generate prometheus data') + raise + + async def stop(self): + await self.runner.cleanup() diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index f5cf8c68c..05a6cf74d 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -2,9 +2,11 @@ import signal import logging import asyncio from concurrent.futures.thread import ThreadPoolExecutor +import typing import lbry from lbry.wallet.server.mempool import MemPool, MemPoolAPI +from lbry.wallet.server.prometheus import PrometheusServer class Notifications: @@ -74,6 +76,7 @@ class Server: self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url) self.db = db = env.coin.DB(env) self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications) + self.prometheus_server: typing.Optional[PrometheusServer] = None # Set notifications up to implement the MemPoolAPI notifications.height = daemon.height @@ -107,11 +110,15 @@ class Server: await self.db.populate_header_merkle_cache() await _start_cancellable(self.mempool.keep_synchronized) await _start_cancellable(self.session_mgr.serve, self.notifications) + await _start_cancellable(self.start_prometheus) async def stop(self): for task in reversed(self.cancellable_tasks): task.cancel() await asyncio.wait(self.cancellable_tasks) + if self.prometheus_server: + await self.prometheus_server.stop() + self.prometheus_server = None self.shutdown_event.set() await self.daemon.close() @@ -132,3 +139,8 @@ class Server: finally: loop.run_until_complete(self.stop()) executor.shutdown(True) + + async def start_prometheus(self, event): + if not self.prometheus_server and self.env.prometheus_port: + self.prometheus_server = PrometheusServer() + await self.prometheus_server.start(self.env.prometheus_port) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 4a71ee781..786993f8c 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -27,6 +27,7 @@ from lbry.wallet.server.db.writer import LBRYDB from lbry.wallet.server.db import reader from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics +from lbry.wallet.server.prometheus import REQUESTS_COUNT from lbry.wallet.rpc import ( RPCSession, JSONRPCAutoDetect, JSONRPCConnection, @@ -709,6 +710,7 @@ class SessionBase(RPCSession): """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. """ + REQUESTS_COUNT.inc() if isinstance(request, Request): handler = self.request_handlers.get(request.method) else: diff --git a/setup.py b/setup.py index 9f61b56c9..42ed2035f 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ setup( 'cryptography==2.5', 'protobuf==3.6.1', 'msgpack==0.6.1', + 'prometheus_client==0.7.1', 'ecdsa==0.13.3', 'pyyaml==4.2b1', 'docopt==0.6.2',