Revert "Add prometheus metrics collection to client and server"
This reverts commit 59a5bacb2e
.
This commit is contained in:
parent
a64f33dbcc
commit
1299c9162c
8 changed files with 2 additions and 82 deletions
|
@ -502,7 +502,6 @@ class Config(CLIConfig):
|
||||||
)
|
)
|
||||||
udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port'])
|
udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port'])
|
||||||
tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port'])
|
tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port'])
|
||||||
prometheus_port = Integer("Port to expose prometheus metrics (off by default)", 0)
|
|
||||||
network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0')
|
network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0')
|
||||||
|
|
||||||
# routing table
|
# routing table
|
||||||
|
|
|
@ -17,7 +17,6 @@ from functools import wraps, partial
|
||||||
import ecdsa
|
import ecdsa
|
||||||
import base58
|
import base58
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from prometheus_client import generate_latest as prom_generate_latest
|
|
||||||
from google.protobuf.message import DecodeError
|
from google.protobuf.message import DecodeError
|
||||||
from lbry.wallet import (
|
from lbry.wallet import (
|
||||||
Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic,
|
Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic,
|
||||||
|
@ -320,10 +319,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
streaming_app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request)
|
streaming_app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request)
|
||||||
self.streaming_runner = web.AppRunner(streaming_app)
|
self.streaming_runner = web.AppRunner(streaming_app)
|
||||||
|
|
||||||
prom_app = web.Application()
|
|
||||||
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
|
||||||
self.metrics_runner = web.AppRunner(prom_app)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def dht_node(self) -> typing.Optional['Node']:
|
def dht_node(self) -> typing.Optional['Node']:
|
||||||
return self.component_manager.get_component(DHT_COMPONENT)
|
return self.component_manager.get_component(DHT_COMPONENT)
|
||||||
|
@ -451,7 +446,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
await self.analytics_manager.send_server_startup()
|
await self.analytics_manager.send_server_startup()
|
||||||
await self.rpc_runner.setup()
|
await self.rpc_runner.setup()
|
||||||
await self.streaming_runner.setup()
|
await self.streaming_runner.setup()
|
||||||
await self.metrics_runner.setup()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
rpc_site = web.TCPSite(self.rpc_runner, self.conf.api_host, self.conf.api_port, shutdown_timeout=.5)
|
rpc_site = web.TCPSite(self.rpc_runner, self.conf.api_host, self.conf.api_port, shutdown_timeout=.5)
|
||||||
|
@ -473,16 +467,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
await self.analytics_manager.send_server_startup_error(str(e))
|
await self.analytics_manager.send_server_startup_error(str(e))
|
||||||
raise SystemExit()
|
raise SystemExit()
|
||||||
|
|
||||||
if self.conf.prometheus_port:
|
|
||||||
try:
|
|
||||||
metrics = web.TCPSite(self.metrics_runner, "0.0.0.0", self.conf.prometheus_port, shutdown_timeout=.5)
|
|
||||||
await metrics.start()
|
|
||||||
log.info('metrics server listening on TCP %s:%i', *metrics._server.sockets[0].getsockname()[:2])
|
|
||||||
except OSError as e:
|
|
||||||
log.error('metrics server failed to bind TCP :%i', self.conf.prometheus_port)
|
|
||||||
await self.analytics_manager.send_server_startup_error(str(e))
|
|
||||||
raise SystemExit()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self.initialize()
|
await self.initialize()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
@ -514,7 +498,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
log.info("stopped api components")
|
log.info("stopped api components")
|
||||||
await self.rpc_runner.cleanup()
|
await self.rpc_runner.cleanup()
|
||||||
await self.streaming_runner.cleanup()
|
await self.streaming_runner.cleanup()
|
||||||
await self.metrics_runner.cleanup()
|
|
||||||
log.info("stopped api server")
|
log.info("stopped api server")
|
||||||
if self.analytics_manager.is_started:
|
if self.analytics_manager.is_started:
|
||||||
self.analytics_manager.stop()
|
self.analytics_manager.stop()
|
||||||
|
@ -544,16 +527,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
content_type='application/json'
|
content_type='application/json'
|
||||||
)
|
)
|
||||||
|
|
||||||
async def handle_metrics_get_request(self, request: web.Request):
|
|
||||||
try:
|
|
||||||
return web.Response(
|
|
||||||
text=prom_generate_latest().decode(),
|
|
||||||
content_type='text/plain; version=0.0.4'
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
log.exception('could not generate prometheus data')
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def handle_stream_get_request(self, request: web.Request):
|
async def handle_stream_get_request(self, request: web.Request):
|
||||||
if not self.conf.streaming_get:
|
if not self.conf.streaming_get:
|
||||||
log.warning("streaming_get is disabled, rejecting request")
|
log.warning("streaming_get is disabled, rejecting request")
|
||||||
|
@ -622,7 +595,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
# TODO: this is for backwards compatibility. Remove this once API and UI are updated
|
# TODO: this is for backwards compatibility. Remove this once API and UI are updated
|
||||||
# TODO: also delete EMPTY_PARAMS then
|
# TODO: also delete EMPTY_PARAMS then
|
||||||
_args, _kwargs = (), args[0]
|
_args, _kwargs = (), args[0]
|
||||||
elif isinstance(args, list) and len(args) == 2 and \
|
elif isinstance(args, list) and len(args) == 2 and\
|
||||||
isinstance(args[0], list) and isinstance(args[1], dict):
|
isinstance(args[0], list) and isinstance(args[1], dict):
|
||||||
_args, _kwargs = args
|
_args, _kwargs = args
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -749,7 +749,7 @@ class LBRYBlockProcessor(BlockProcessor):
|
||||||
timer = self.timer.sub_timers['advance_blocks']
|
timer = self.timer.sub_timers['advance_blocks']
|
||||||
undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs')
|
undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs')
|
||||||
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
|
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
|
||||||
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
|
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(20):
|
||||||
self.timer.show(height=height)
|
self.timer.show(height=height)
|
||||||
return undo
|
return undo
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,6 @@ class Env:
|
||||||
self.ssl_certfile = self.required('SSL_CERTFILE')
|
self.ssl_certfile = self.required('SSL_CERTFILE')
|
||||||
self.ssl_keyfile = self.required('SSL_KEYFILE')
|
self.ssl_keyfile = self.required('SSL_KEYFILE')
|
||||||
self.rpc_port = self.integer('RPC_PORT', 8000)
|
self.rpc_port = self.integer('RPC_PORT', 8000)
|
||||||
self.prometheus_port = self.integer('PROMETHEUS_PORT', 0)
|
|
||||||
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
|
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
|
||||||
self.banner_file = self.default('BANNER_FILE', None)
|
self.banner_file = self.default('BANNER_FILE', None)
|
||||||
self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
|
self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
from aiohttp import web
|
|
||||||
from prometheus_client import Counter, generate_latest as prom_generate_latest
|
|
||||||
from lbry.wallet.server import util
|
|
||||||
|
|
||||||
NAMESPACE = "wallet_server"
|
|
||||||
|
|
||||||
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE)
|
|
||||||
|
|
||||||
|
|
||||||
class PrometheusServer:
|
|
||||||
def __init__(self):
|
|
||||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
|
||||||
self.runner = None
|
|
||||||
|
|
||||||
async def start(self, port: int):
|
|
||||||
prom_app = web.Application()
|
|
||||||
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
|
||||||
self.runner = web.AppRunner(prom_app)
|
|
||||||
await self.runner.setup()
|
|
||||||
|
|
||||||
metrics_site = web.TCPSite(self.runner, "0.0.0.0", port, shutdown_timeout=.5)
|
|
||||||
await metrics_site.start()
|
|
||||||
self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2])
|
|
||||||
|
|
||||||
async def handle_metrics_get_request(self, request: web.Request):
|
|
||||||
try:
|
|
||||||
return web.Response(
|
|
||||||
text=prom_generate_latest().decode(),
|
|
||||||
content_type='text/plain; version=0.0.4'
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
self.logger.exception('could not generate prometheus data')
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def stop(self):
|
|
||||||
await self.runner.cleanup()
|
|
|
@ -2,11 +2,9 @@ import signal
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
import typing
|
|
||||||
|
|
||||||
import lbry
|
import lbry
|
||||||
from lbry.wallet.server.mempool import MemPool, MemPoolAPI
|
from lbry.wallet.server.mempool import MemPool, MemPoolAPI
|
||||||
from lbry.wallet.server.prometheus import PrometheusServer
|
|
||||||
|
|
||||||
|
|
||||||
class Notifications:
|
class Notifications:
|
||||||
|
@ -76,7 +74,6 @@ class Server:
|
||||||
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
|
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
|
||||||
self.db = db = env.coin.DB(env)
|
self.db = db = env.coin.DB(env)
|
||||||
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications)
|
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications)
|
||||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
|
||||||
|
|
||||||
# Set notifications up to implement the MemPoolAPI
|
# Set notifications up to implement the MemPoolAPI
|
||||||
notifications.height = daemon.height
|
notifications.height = daemon.height
|
||||||
|
@ -110,15 +107,11 @@ class Server:
|
||||||
await self.db.populate_header_merkle_cache()
|
await self.db.populate_header_merkle_cache()
|
||||||
await _start_cancellable(self.mempool.keep_synchronized)
|
await _start_cancellable(self.mempool.keep_synchronized)
|
||||||
await _start_cancellable(self.session_mgr.serve, self.notifications)
|
await _start_cancellable(self.session_mgr.serve, self.notifications)
|
||||||
await _start_cancellable(self.start_prometheus)
|
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
for task in reversed(self.cancellable_tasks):
|
for task in reversed(self.cancellable_tasks):
|
||||||
task.cancel()
|
task.cancel()
|
||||||
await asyncio.wait(self.cancellable_tasks)
|
await asyncio.wait(self.cancellable_tasks)
|
||||||
if self.prometheus_server:
|
|
||||||
await self.prometheus_server.stop()
|
|
||||||
self.prometheus_server = None
|
|
||||||
self.shutdown_event.set()
|
self.shutdown_event.set()
|
||||||
await self.daemon.close()
|
await self.daemon.close()
|
||||||
|
|
||||||
|
@ -139,8 +132,3 @@ class Server:
|
||||||
finally:
|
finally:
|
||||||
loop.run_until_complete(self.stop())
|
loop.run_until_complete(self.stop())
|
||||||
executor.shutdown(True)
|
executor.shutdown(True)
|
||||||
|
|
||||||
async def start_prometheus(self, event):
|
|
||||||
if not self.prometheus_server and self.env.prometheus_port:
|
|
||||||
self.prometheus_server = PrometheusServer()
|
|
||||||
await self.prometheus_server.start(self.env.prometheus_port)
|
|
||||||
|
|
|
@ -27,7 +27,6 @@ from lbry.wallet.server.db.writer import LBRYLevelDB
|
||||||
from lbry.wallet.server.db import reader
|
from lbry.wallet.server.db import reader
|
||||||
from lbry.wallet.server.websocket import AdminWebSocket
|
from lbry.wallet.server.websocket import AdminWebSocket
|
||||||
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
|
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
|
||||||
from lbry.wallet.server.prometheus import REQUESTS_COUNT
|
|
||||||
|
|
||||||
from lbry.wallet.rpc import (
|
from lbry.wallet.rpc import (
|
||||||
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
|
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
|
||||||
|
@ -708,7 +707,6 @@ class SessionBase(RPCSession):
|
||||||
"""Handle an incoming request. ElectrumX doesn't receive
|
"""Handle an incoming request. ElectrumX doesn't receive
|
||||||
notifications from client sessions.
|
notifications from client sessions.
|
||||||
"""
|
"""
|
||||||
REQUESTS_COUNT.inc()
|
|
||||||
if isinstance(request, Request):
|
if isinstance(request, Request):
|
||||||
handler = self.request_handlers.get(request.method)
|
handler = self.request_handlers.get(request.method)
|
||||||
else:
|
else:
|
||||||
|
|
1
setup.py
1
setup.py
|
@ -44,7 +44,6 @@ setup(
|
||||||
'cryptography==2.5',
|
'cryptography==2.5',
|
||||||
'protobuf==3.6.1',
|
'protobuf==3.6.1',
|
||||||
'msgpack==0.6.1',
|
'msgpack==0.6.1',
|
||||||
'prometheus_client==0.7.1',
|
|
||||||
'ecdsa==0.13.3',
|
'ecdsa==0.13.3',
|
||||||
'pyyaml==4.2b1',
|
'pyyaml==4.2b1',
|
||||||
'docopt==0.6.2',
|
'docopt==0.6.2',
|
||||||
|
|
Loading…
Reference in a new issue