Merge branch 'prometheus'

* prometheus:
  Add prometheus metrics collection to client and server
This commit is contained in:
Alex Grintsvayg 2020-01-20 12:45:11 -05:00
commit 709128225b
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
8 changed files with 82 additions and 2 deletions

View file

@ -502,6 +502,7 @@ class Config(CLIConfig):
)
udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port'])
tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port'])
prometheus_port = Integer("Port to expose prometheus metrics (off by default)", 0)
network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0')
# routing table

View file

@ -17,6 +17,7 @@ from functools import wraps, partial
import ecdsa
import base58
from aiohttp import web
from prometheus_client import generate_latest as prom_generate_latest, CONTENT_TYPE_LATEST as PROM_CONTENT_TYPE_LATEST
from google.protobuf.message import DecodeError
from lbry.wallet import (
Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic,
@ -319,6 +320,10 @@ class Daemon(metaclass=JSONRPCServerType):
streaming_app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request)
self.streaming_runner = web.AppRunner(streaming_app)
prom_app = web.Application()
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
self.metrics_runner = web.AppRunner(prom_app)
@property
def dht_node(self) -> typing.Optional['Node']:
return self.component_manager.get_component(DHT_COMPONENT)
@ -446,6 +451,7 @@ class Daemon(metaclass=JSONRPCServerType):
await self.analytics_manager.send_server_startup()
await self.rpc_runner.setup()
await self.streaming_runner.setup()
await self.metrics_runner.setup()
try:
rpc_site = web.TCPSite(self.rpc_runner, self.conf.api_host, self.conf.api_port, shutdown_timeout=.5)
@ -467,6 +473,16 @@ class Daemon(metaclass=JSONRPCServerType):
await self.analytics_manager.send_server_startup_error(str(e))
raise SystemExit()
if self.conf.prometheus_port:
try:
metrics_site = web.TCPSite(self.metrics_runner, "0.0.0.0", self.conf.prometheus_port, shutdown_timeout=.5)
await metrics_site.start()
log.info('metrics server listening on TCP %s:%i', *metrics_site._server.sockets[0].getsockname()[:2])
except OSError as e:
log.error('metrics server failed to bind TCP :%i', self.conf.prometheus_port)
await self.analytics_manager.send_server_startup_error(str(e))
raise SystemExit()
try:
await self.initialize()
except asyncio.CancelledError:
@ -498,6 +514,7 @@ class Daemon(metaclass=JSONRPCServerType):
log.info("stopped api components")
await self.rpc_runner.cleanup()
await self.streaming_runner.cleanup()
await self.metrics_runner.cleanup()
log.info("stopped api server")
if self.analytics_manager.is_started:
self.analytics_manager.stop()
@ -527,6 +544,16 @@ class Daemon(metaclass=JSONRPCServerType):
content_type='application/json'
)
async def handle_metrics_get_request(self, request: web.Request):
try:
return web.Response(
text=prom_generate_latest().decode(),
content_type='text/plain; version=0.0.4'
)
except Exception:
log.exception('could not generate prometheus data')
raise
async def handle_stream_get_request(self, request: web.Request):
if not self.conf.streaming_get:
log.warning("streaming_get is disabled, rejecting request")
@ -595,7 +622,7 @@ class Daemon(metaclass=JSONRPCServerType):
# TODO: this is for backwards compatibility. Remove this once API and UI are updated
# TODO: also delete EMPTY_PARAMS then
_args, _kwargs = (), args[0]
elif isinstance(args, list) and len(args) == 2 and\
elif isinstance(args, list) and len(args) == 2 and \
isinstance(args[0], list) and isinstance(args[1], dict):
_args, _kwargs = args
else:

View file

@ -749,7 +749,7 @@ class LBRYBlockProcessor(BlockProcessor):
timer = self.timer.sub_timers['advance_blocks']
undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs')
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(20):
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
self.timer.show(height=height)
return undo

View file

@ -59,6 +59,7 @@ class Env:
self.ssl_certfile = self.required('SSL_CERTFILE')
self.ssl_keyfile = self.required('SSL_KEYFILE')
self.rpc_port = self.integer('RPC_PORT', 8000)
self.prometheus_port = self.integer('PROMETHEUS_PORT', 0)
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None)
self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)

View file

@ -0,0 +1,36 @@
from aiohttp import web
from prometheus_client import Counter, generate_latest as prom_generate_latest
from lbry.wallet.server import util
NAMESPACE = "wallet_server"
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE)
class PrometheusServer:
def __init__(self):
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.runner = None
async def start(self, port: int):
prom_app = web.Application()
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
self.runner = web.AppRunner(prom_app)
await self.runner.setup()
metrics_site = web.TCPSite(self.runner, "0.0.0.0", port, shutdown_timeout=.5)
await metrics_site.start()
self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2])
async def handle_metrics_get_request(self, request: web.Request):
try:
return web.Response(
text=prom_generate_latest().decode(),
content_type='text/plain; version=0.0.4'
)
except Exception:
self.logger.exception('could not generate prometheus data')
raise
async def stop(self):
await self.runner.cleanup()

View file

@ -2,9 +2,11 @@ import signal
import logging
import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
import typing
import lbry
from lbry.wallet.server.mempool import MemPool, MemPoolAPI
from lbry.wallet.server.prometheus import PrometheusServer
class Notifications:
@ -74,6 +76,7 @@ class Server:
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
self.db = db = env.coin.DB(env)
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications)
self.prometheus_server: typing.Optional[PrometheusServer] = None
# Set notifications up to implement the MemPoolAPI
notifications.height = daemon.height
@ -107,11 +110,15 @@ class Server:
await self.db.populate_header_merkle_cache()
await _start_cancellable(self.mempool.keep_synchronized)
await _start_cancellable(self.session_mgr.serve, self.notifications)
await _start_cancellable(self.start_prometheus)
async def stop(self):
for task in reversed(self.cancellable_tasks):
task.cancel()
await asyncio.wait(self.cancellable_tasks)
if self.prometheus_server:
await self.prometheus_server.stop()
self.prometheus_server = None
self.shutdown_event.set()
await self.daemon.close()
@ -132,3 +139,8 @@ class Server:
finally:
loop.run_until_complete(self.stop())
executor.shutdown(True)
async def start_prometheus(self, event):
if not self.prometheus_server and self.env.prometheus_port:
self.prometheus_server = PrometheusServer()
await self.prometheus_server.start(self.env.prometheus_port)

View file

@ -27,6 +27,7 @@ from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.server.prometheus import REQUESTS_COUNT
from lbry.wallet.rpc import (
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
@ -709,6 +710,7 @@ class SessionBase(RPCSession):
"""Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions.
"""
REQUESTS_COUNT.inc()
if isinstance(request, Request):
handler = self.request_handlers.get(request.method)
else:

View file

@ -44,6 +44,7 @@ setup(
'cryptography==2.5',
'protobuf==3.6.1',
'msgpack==0.6.1',
'prometheus_client==0.7.1',
'ecdsa==0.13.3',
'pyyaml==4.2b1',
'docopt==0.6.2',