Merge pull request #2734 from lbryio/prom2
Add prometheus metrics collection to client and server (take two)
This commit is contained in:
commit
254e184677
8 changed files with 82 additions and 2 deletions
|
@ -502,6 +502,7 @@ class Config(CLIConfig):
|
|||
)
|
||||
udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port'])
|
||||
tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port'])
|
||||
prometheus_port = Integer("Port to expose prometheus metrics (off by default)", 0)
|
||||
network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0')
|
||||
|
||||
# routing table
|
||||
|
|
|
@ -17,6 +17,7 @@ from functools import wraps, partial
|
|||
import ecdsa
|
||||
import base58
|
||||
from aiohttp import web
|
||||
from prometheus_client import generate_latest as prom_generate_latest
|
||||
from google.protobuf.message import DecodeError
|
||||
from lbry.wallet import (
|
||||
Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic,
|
||||
|
@ -319,6 +320,10 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
streaming_app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request)
|
||||
self.streaming_runner = web.AppRunner(streaming_app)
|
||||
|
||||
prom_app = web.Application()
|
||||
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
||||
self.metrics_runner = web.AppRunner(prom_app)
|
||||
|
||||
@property
|
||||
def dht_node(self) -> typing.Optional['Node']:
|
||||
return self.component_manager.get_component(DHT_COMPONENT)
|
||||
|
@ -446,6 +451,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
await self.analytics_manager.send_server_startup()
|
||||
await self.rpc_runner.setup()
|
||||
await self.streaming_runner.setup()
|
||||
await self.metrics_runner.setup()
|
||||
|
||||
try:
|
||||
rpc_site = web.TCPSite(self.rpc_runner, self.conf.api_host, self.conf.api_port, shutdown_timeout=.5)
|
||||
|
@ -467,6 +473,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
await self.analytics_manager.send_server_startup_error(str(e))
|
||||
raise SystemExit()
|
||||
|
||||
if self.conf.prometheus_port:
|
||||
try:
|
||||
prom_site = web.TCPSite(self.metrics_runner, "0.0.0.0", self.conf.prometheus_port, shutdown_timeout=.5)
|
||||
await prom_site.start()
|
||||
log.info('metrics server listening on TCP %s:%i', *prom_site._server.sockets[0].getsockname()[:2])
|
||||
except OSError as e:
|
||||
log.error('metrics server failed to bind TCP :%i', self.conf.prometheus_port)
|
||||
await self.analytics_manager.send_server_startup_error(str(e))
|
||||
raise SystemExit()
|
||||
|
||||
try:
|
||||
await self.initialize()
|
||||
except asyncio.CancelledError:
|
||||
|
@ -498,6 +514,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
log.info("stopped api components")
|
||||
await self.rpc_runner.cleanup()
|
||||
await self.streaming_runner.cleanup()
|
||||
await self.metrics_runner.cleanup()
|
||||
log.info("stopped api server")
|
||||
if self.analytics_manager.is_started:
|
||||
self.analytics_manager.stop()
|
||||
|
@ -527,6 +544,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
content_type='application/json'
|
||||
)
|
||||
|
||||
async def handle_metrics_get_request(self, request: web.Request):
|
||||
try:
|
||||
return web.Response(
|
||||
text=prom_generate_latest().decode(),
|
||||
content_type='text/plain; version=0.0.4'
|
||||
)
|
||||
except Exception:
|
||||
log.exception('could not generate prometheus data')
|
||||
raise
|
||||
|
||||
async def handle_stream_get_request(self, request: web.Request):
|
||||
if not self.conf.streaming_get:
|
||||
log.warning("streaming_get is disabled, rejecting request")
|
||||
|
@ -595,7 +622,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
# TODO: this is for backwards compatibility. Remove this once API and UI are updated
|
||||
# TODO: also delete EMPTY_PARAMS then
|
||||
_args, _kwargs = (), args[0]
|
||||
elif isinstance(args, list) and len(args) == 2 and\
|
||||
elif isinstance(args, list) and len(args) == 2 and \
|
||||
isinstance(args[0], list) and isinstance(args[1], dict):
|
||||
_args, _kwargs = args
|
||||
else:
|
||||
|
|
|
@ -749,7 +749,7 @@ class LBRYBlockProcessor(BlockProcessor):
|
|||
timer = self.timer.sub_timers['advance_blocks']
|
||||
undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs')
|
||||
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
|
||||
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(20):
|
||||
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
|
||||
self.timer.show(height=height)
|
||||
return undo
|
||||
|
||||
|
|
|
@ -59,6 +59,7 @@ class Env:
|
|||
self.ssl_certfile = self.required('SSL_CERTFILE')
|
||||
self.ssl_keyfile = self.required('SSL_KEYFILE')
|
||||
self.rpc_port = self.integer('RPC_PORT', 8000)
|
||||
self.prometheus_port = self.integer('PROMETHEUS_PORT', 0)
|
||||
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
|
||||
self.banner_file = self.default('BANNER_FILE', None)
|
||||
self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
|
||||
|
|
36
lbry/wallet/server/prometheus.py
Normal file
36
lbry/wallet/server/prometheus.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
from aiohttp import web
|
||||
from prometheus_client import Counter, generate_latest as prom_generate_latest
|
||||
from lbry.wallet.server import util
|
||||
|
||||
NAMESPACE = "wallet_server"
|
||||
|
||||
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE)
|
||||
|
||||
|
||||
class PrometheusServer:
|
||||
def __init__(self):
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
self.runner = None
|
||||
|
||||
async def start(self, port: int):
|
||||
prom_app = web.Application()
|
||||
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
||||
self.runner = web.AppRunner(prom_app)
|
||||
await self.runner.setup()
|
||||
|
||||
metrics_site = web.TCPSite(self.runner, "0.0.0.0", port, shutdown_timeout=.5)
|
||||
await metrics_site.start()
|
||||
self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2])
|
||||
|
||||
async def handle_metrics_get_request(self, request: web.Request):
|
||||
try:
|
||||
return web.Response(
|
||||
text=prom_generate_latest().decode(),
|
||||
content_type='text/plain; version=0.0.4'
|
||||
)
|
||||
except Exception:
|
||||
self.logger.exception('could not generate prometheus data')
|
||||
raise
|
||||
|
||||
async def stop(self):
|
||||
await self.runner.cleanup()
|
|
@ -2,9 +2,11 @@ import signal
|
|||
import logging
|
||||
import asyncio
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
import typing
|
||||
|
||||
import lbry
|
||||
from lbry.wallet.server.mempool import MemPool, MemPoolAPI
|
||||
from lbry.wallet.server.prometheus import PrometheusServer
|
||||
|
||||
|
||||
class Notifications:
|
||||
|
@ -74,6 +76,7 @@ class Server:
|
|||
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
|
||||
self.db = db = env.coin.DB(env)
|
||||
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications)
|
||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||
|
||||
# Set notifications up to implement the MemPoolAPI
|
||||
notifications.height = daemon.height
|
||||
|
@ -107,11 +110,15 @@ class Server:
|
|||
await self.db.populate_header_merkle_cache()
|
||||
await _start_cancellable(self.mempool.keep_synchronized)
|
||||
await _start_cancellable(self.session_mgr.serve, self.notifications)
|
||||
await self.start_prometheus()
|
||||
|
||||
async def stop(self):
|
||||
for task in reversed(self.cancellable_tasks):
|
||||
task.cancel()
|
||||
await asyncio.wait(self.cancellable_tasks)
|
||||
if self.prometheus_server:
|
||||
await self.prometheus_server.stop()
|
||||
self.prometheus_server = None
|
||||
self.shutdown_event.set()
|
||||
await self.daemon.close()
|
||||
|
||||
|
@ -132,3 +139,8 @@ class Server:
|
|||
finally:
|
||||
loop.run_until_complete(self.stop())
|
||||
executor.shutdown(True)
|
||||
|
||||
async def start_prometheus(self):
|
||||
if not self.prometheus_server and self.env.prometheus_port:
|
||||
self.prometheus_server = PrometheusServer()
|
||||
await self.prometheus_server.start(self.env.prometheus_port)
|
||||
|
|
|
@ -27,6 +27,7 @@ from lbry.wallet.server.db.writer import LBRYLevelDB
|
|||
from lbry.wallet.server.db import reader
|
||||
from lbry.wallet.server.websocket import AdminWebSocket
|
||||
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
|
||||
from lbry.wallet.server.prometheus import REQUESTS_COUNT
|
||||
|
||||
from lbry.wallet.rpc import (
|
||||
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
|
||||
|
@ -707,6 +708,7 @@ class SessionBase(RPCSession):
|
|||
"""Handle an incoming request. ElectrumX doesn't receive
|
||||
notifications from client sessions.
|
||||
"""
|
||||
REQUESTS_COUNT.inc()
|
||||
if isinstance(request, Request):
|
||||
handler = self.request_handlers.get(request.method)
|
||||
else:
|
||||
|
|
1
setup.py
1
setup.py
|
@ -44,6 +44,7 @@ setup(
|
|||
'cryptography==2.5',
|
||||
'protobuf==3.6.1',
|
||||
'msgpack==0.6.1',
|
||||
'prometheus_client==0.7.1',
|
||||
'ecdsa==0.13.3',
|
||||
'pyyaml==4.2b1',
|
||||
'docopt==0.6.2',
|
||||
|
|
Loading…
Reference in a new issue