Merge pull request #2749 from lbryio/prometheus-interrupts

add metrics to wallet server prometheus
This commit is contained in:
Jack Robison 2020-01-28 16:31:10 -05:00 committed by GitHub
commit 7d5e19716a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 65 deletions

View file

@ -40,6 +40,7 @@ from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Noti
from .jsonrpc import RPCError, ProtocolError from .jsonrpc import RPCError, ProtocolError
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
from .util import Concurrency from .util import Concurrency
from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT
class Connector: class Connector:
@ -405,6 +406,7 @@ class RPCSession(SessionBase):
await self._task_group.add(self._handle_request(request)) await self._task_group.add(self._handle_request(request))
async def _handle_request(self, request): async def _handle_request(self, request):
start = time.perf_counter()
try: try:
result = await self.handle_request(request) result = await self.handle_request(request)
except (ProtocolError, RPCError) as e: except (ProtocolError, RPCError) as e:
@ -417,10 +419,12 @@ class RPCSession(SessionBase):
'internal server error') 'internal server error')
if isinstance(request, Request): if isinstance(request, Request):
message = request.send_result(result) message = request.send_result(result)
RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start)
if message: if message:
await self._send_message(message) await self._send_message(message)
if isinstance(result, Exception): if isinstance(result, Exception):
self._bump_errors() self._bump_errors()
REQUEST_ERRORS_COUNT.labels(method=request.method).inc()
def connection_lost(self, exc): def connection_lost(self, exc):
# Cancel pending requests and message processing # Cancel pending requests and message processing
@ -455,6 +459,7 @@ class RPCSession(SessionBase):
"""Send an RPC notification over the network.""" """Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args)) message = self.connection.send_notification(Notification(method, args))
await self._send_message(message) await self._send_message(message)
NOTIFICATION_COUNT.labels(method=method).inc()
def send_batch(self, raise_errors=False): def send_batch(self, raise_errors=False):
"""Return a BatchRequest. Intended to be used like so: """Return a BatchRequest. Intended to be used like so:

View file

@ -9,7 +9,7 @@ import aiohttp
from lbry.wallet.rpc.jsonrpc import RPCError from lbry.wallet.rpc.jsonrpc import RPCError
from lbry.wallet.server.util import hex_to_bytes, class_logger from lbry.wallet.server.util import hex_to_bytes, class_logger
from lbry.wallet.rpc import JSONRPC from lbry.wallet.rpc import JSONRPC
from lbry.wallet.server.prometheus import LBRYCRD_REQUEST_TIMES, LBRYCRD_PENDING_COUNT
class DaemonError(Exception): class DaemonError(Exception):
"""Raised when the daemon returns an error in its results.""" """Raised when the daemon returns an error in its results."""
@ -106,6 +106,7 @@ class Daemon:
Handles temporary connection issues. Daemon response errors Handles temporary connection issues. Daemon response errors
are raise through DaemonError. are raise through DaemonError.
""" """
def log_error(error): def log_error(error):
nonlocal last_error_log, retry nonlocal last_error_log, retry
now = time.time() now = time.time()
@ -119,8 +120,13 @@ class Daemon:
last_error_log = 0 last_error_log = 0
data = json.dumps(payload) data = json.dumps(payload)
retry = self.init_retry retry = self.init_retry
methods = tuple(
[payload['method']] if isinstance(payload, dict) else [request['method'] for request in payload]
)
while True: while True:
try: try:
for method in methods:
LBRYCRD_PENDING_COUNT.labels(method=method).inc()
result = await self._send_data(data) result = await self._send_data(data)
result = processor(result) result = processor(result)
if on_good_message: if on_good_message:
@ -143,12 +149,17 @@ class Daemon:
except WorkQueueFullError: except WorkQueueFullError:
log_error('work queue full.') log_error('work queue full.')
on_good_message = 'running normally' on_good_message = 'running normally'
finally:
for method in methods:
LBRYCRD_PENDING_COUNT.labels(method=method).dec()
await asyncio.sleep(retry) await asyncio.sleep(retry)
retry = max(min(self.max_retry, retry * 2), self.init_retry) retry = max(min(self.max_retry, retry * 2), self.init_retry)
async def _send_single(self, method, params=None): async def _send_single(self, method, params=None):
"""Send a single request to the daemon.""" """Send a single request to the daemon."""
start = time.perf_counter()
def processor(result): def processor(result):
err = result['error'] err = result['error']
if not err: if not err:
@ -160,7 +171,9 @@ class Daemon:
payload = {'method': method, 'id': next(self.id_counter)} payload = {'method': method, 'id': next(self.id_counter)}
if params: if params:
payload['params'] = params payload['params'] = params
return await self._send(payload, processor) result = await self._send(payload, processor)
LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start)
return result
async def _send_vector(self, method, params_iterable, replace_errs=False): async def _send_vector(self, method, params_iterable, replace_errs=False):
"""Send several requests of the same method. """Send several requests of the same method.
@ -168,6 +181,9 @@ class Daemon:
The result will be an array of the same length as params_iterable. The result will be an array of the same length as params_iterable.
If replace_errs is true, any item with an error is returned as None, If replace_errs is true, any item with an error is returned as None,
otherwise an exception is raised.""" otherwise an exception is raised."""
start = time.perf_counter()
def processor(result): def processor(result):
errs = [item['error'] for item in result if item['error']] errs = [item['error'] for item in result if item['error']]
if any(err.get('code') == self.WARMING_UP for err in errs): if any(err.get('code') == self.WARMING_UP for err in errs):
@ -178,9 +194,11 @@ class Daemon:
payload = [{'method': method, 'params': p, 'id': next(self.id_counter)} payload = [{'method': method, 'params': p, 'id': next(self.id_counter)}
for p in params_iterable] for p in params_iterable]
result = []
if payload: if payload:
return await self._send(payload, processor) result = await self._send(payload, processor)
return [] LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start)
return result
async def _is_rpc_available(self, method): async def _is_rpc_available(self, method):
"""Return whether given RPC method is available in the daemon. """Return whether given RPC method is available in the daemon.

View file

@ -1,5 +1,5 @@
from aiohttp import web from aiohttp import web
from prometheus_client import Counter, Info, generate_latest as prom_generate_latest from prometheus_client import Counter, Info, generate_latest as prom_generate_latest, Histogram, Gauge
from lbry import __version__ as version from lbry import __version__ as version
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server import util from lbry.wallet.server import util
@ -15,7 +15,36 @@ VERSION_INFO.info({
'version': version, 'version': version,
"min_version": util.version_string(wallet_server_version.PROTOCOL_MIN), "min_version": util.version_string(wallet_server_version.PROTOCOL_MIN),
}) })
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, labelnames=("method",)) SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE)
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method",))
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method",))
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=NAMESPACE, labelnames=("method",))
REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE,
labelnames=("method",))
SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
SQLITE_OPERATIONAL_ERROR_COUNT = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
)
SQLITE_INTERNAL_ERROR_COUNT = Counter(
"internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE
)
SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE)
SQLITE_PENDING_COUNT = Gauge(
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
)
LBRYCRD_REQUEST_TIMES = Histogram(
"lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",)
)
LBRYCRD_PENDING_COUNT = Gauge(
"lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE, labelnames=("method",)
)
CLIENT_VERSIONS = Counter(
"clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",)
)
class PrometheusServer: class PrometheusServer:

View file

@ -27,7 +27,9 @@ from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.db import reader from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.server.prometheus import REQUESTS_COUNT from lbry.wallet.server.prometheus import REQUESTS_COUNT, SQLITE_INTERRUPT_COUNT, SQLITE_INTERNAL_ERROR_COUNT
from lbry.wallet.server.prometheus import SQLITE_OPERATIONAL_ERROR_COUNT, SQLITE_EXECUTOR_TIMES, SESSIONS_COUNT
from lbry.wallet.server.prometheus import SQLITE_PENDING_COUNT, CLIENT_VERSIONS
import lbry.wallet.server.version as VERSION import lbry.wallet.server.version as VERSION
from lbry.wallet.rpc import ( from lbry.wallet.rpc import (
@ -142,13 +144,6 @@ class SessionManager:
self.session_event = Event() self.session_event = Event()
# Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
'query reorg sessions stop'.split())
LocalRPC.request_handlers.update(
{cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds}
)
async def _start_server(self, kind, *args, **kw_args): async def _start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if kind == 'RPC': if kind == 'RPC':
@ -679,6 +674,7 @@ class SessionBase(RPCSession):
context = {'conn_id': f'{self.session_id}'} context = {'conn_id': f'{self.session_id}'}
self.logger = util.ConnectionLogger(self.logger, context) self.logger = util.ConnectionLogger(self.logger, context)
self.group = self.session_mgr.add_session(self) self.group = self.session_mgr.add_session(self)
SESSIONS_COUNT.inc()
self.logger.info(f'{self.kind} {self.peer_address_str()}, ' self.logger.info(f'{self.kind} {self.peer_address_str()}, '
f'{self.session_mgr.session_count():,d} total') f'{self.session_mgr.session_count():,d} total')
@ -686,6 +682,7 @@ class SessionBase(RPCSession):
"""Handle client disconnection.""" """Handle client disconnection."""
super().connection_lost(exc) super().connection_lost(exc)
self.session_mgr.remove_session(self) self.session_mgr.remove_session(self)
SESSIONS_COUNT.dec()
msg = '' msg = ''
if not self._can_send.is_set(): if not self._can_send.is_set():
msg += ' whilst paused' msg += ' whilst paused'
@ -712,6 +709,7 @@ class SessionBase(RPCSession):
REQUESTS_COUNT.labels(method=request.method).inc() REQUESTS_COUNT.labels(method=request.method).inc()
if isinstance(request, Request): if isinstance(request, Request):
handler = self.request_handlers.get(request.method) handler = self.request_handlers.get(request.method)
handler = partial(handler, self)
else: else:
handler = None handler = None
coro = handler_invocation(handler, request)() coro = handler_invocation(handler, request)()
@ -785,16 +783,56 @@ class LBRYElectrumX(SessionBase):
session_mgr: LBRYSessionManager session_mgr: LBRYSessionManager
version = lbry.__version__ version = lbry.__version__
@classmethod
def initialize_request_handlers(cls):
cls.request_handlers.update({
'blockchain.block.get_chunk': cls.block_get_chunk,
'blockchain.block.get_header': cls.block_get_header,
'blockchain.estimatefee': cls.estimatefee,
'blockchain.relayfee': cls.relayfee,
'blockchain.scripthash.get_balance': cls.scripthash_get_balance,
'blockchain.scripthash.get_history': cls.scripthash_get_history,
'blockchain.scripthash.get_mempool': cls.scripthash_get_mempool,
'blockchain.scripthash.listunspent': cls.scripthash_listunspent,
'blockchain.scripthash.subscribe': cls.scripthash_subscribe,
'blockchain.transaction.broadcast': cls.transaction_broadcast,
'blockchain.transaction.get': cls.transaction_get,
'blockchain.transaction.get_merkle': cls.transaction_merkle,
'server.add_peer': cls.add_peer,
'server.banner': cls.banner,
'server.donation_address': cls.donation_address,
'server.features': cls.server_features_async,
'server.peers.subscribe': cls.peers_subscribe,
'server.version': cls.server_version,
'blockchain.transaction.get_height': cls.transaction_get_height,
'blockchain.claimtrie.search': cls.claimtrie_search,
'blockchain.claimtrie.resolve': cls.claimtrie_resolve,
'blockchain.claimtrie.getclaimsbyids': cls.claimtrie_getclaimsbyids,
'blockchain.block.get_server_height': cls.get_server_height,
'mempool.get_fee_histogram': cls.mempool_compact_histogram,
'blockchain.block.headers': cls.block_headers,
'server.ping': cls.ping,
'blockchain.headers.subscribe': cls.headers_subscribe_False,
'blockchain.address.get_balance': cls.address_get_balance,
'blockchain.address.get_history': cls.address_get_history,
'blockchain.address.get_mempool': cls.address_get_mempool,
'blockchain.address.listunspent': cls.address_listunspent,
'blockchain.address.subscribe': cls.address_subscribe,
'blockchain.address.unsubscribe': cls.address_unsubscribe,
})
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
if not LBRYElectrumX.request_handlers:
LBRYElectrumX.initialize_request_handlers()
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_headers_raw = False self.subscribe_headers_raw = False
self.connection.max_response_size = self.env.max_send self.connection.max_response_size = self.env.max_send
self.hashX_subs = {} self.hashX_subs = {}
self.sv_seen = False self.sv_seen = False
self.mempool_statuses = {} self.mempool_statuses = {}
self.set_request_handlers(self.PROTOCOL_MIN) self.protocol_tuple = self.PROTOCOL_MIN
# fixme: this is a rebase hack, we need to go through ChainState instead later
self.daemon = self.session_mgr.daemon self.daemon = self.session_mgr.daemon
self.bp: LBRYBlockProcessor = self.session_mgr.bp self.bp: LBRYBlockProcessor = self.session_mgr.bp
self.db: LBRYLevelDB = self.bp.db self.db: LBRYLevelDB = self.bp.db
@ -888,6 +926,7 @@ class LBRYElectrumX(SessionBase):
async def run_in_executor(self, query_name, func, kwargs): async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter() start = time.perf_counter()
try: try:
SQLITE_PENDING_COUNT.inc()
result = await asyncio.get_running_loop().run_in_executor( result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs self.session_mgr.query_executor, func, kwargs
) )
@ -896,23 +935,28 @@ class LBRYElectrumX(SessionBase):
except reader.SQLiteInterruptedError as error: except reader.SQLiteInterruptedError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_interrupt(start, error.metrics) metrics.query_interrupt(start, error.metrics)
SQLITE_INTERRUPT_COUNT.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
except reader.SQLiteOperationalError as error: except reader.SQLiteOperationalError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, error.metrics) metrics.query_error(start, error.metrics)
SQLITE_OPERATIONAL_ERROR_COUNT.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
except Exception: except Exception:
log.exception("dear devs, please handle this exception better") log.exception("dear devs, please handle this exception better")
metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, {}) metrics.query_error(start, {})
SQLITE_INTERNAL_ERROR_COUNT.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
if self.env.track_metrics: if self.env.track_metrics:
metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics = self.get_metrics_or_placeholder_for_api(query_name)
(result, metrics_data) = result (result, metrics_data) = result
metrics.query_response(start, metrics_data) metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode()
return base64.b64encode(result).decode() finally:
SQLITE_PENDING_COUNT.dec()
SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start)
async def run_and_cache_query(self, query_name, function, kwargs): async def run_and_cache_query(self, query_name, function, kwargs):
metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics = self.get_metrics_or_placeholder_for_api(query_name)
@ -935,6 +979,9 @@ class LBRYElectrumX(SessionBase):
metrics.cache_response() metrics.cache_response()
return cache_item.result return cache_item.result
async def mempool_compact_histogram(self):
return self.mempool.compact_fee_histogram()
async def claimtrie_search(self, **kwargs): async def claimtrie_search(self, **kwargs):
if kwargs: if kwargs:
return await self.run_and_cache_query('search', reader.search_to_bytes, kwargs) return await self.run_and_cache_query('search', reader.search_to_bytes, kwargs)
@ -1370,6 +1417,7 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST, raise RPCError(BAD_REQUEST,
f'unsupported client: {client_name}') f'unsupported client: {client_name}')
self.client = client_name[:17] self.client = client_name[:17]
CLIENT_VERSIONS.labels(version=str(client_name)).inc()
# Find the highest common protocol version. Disconnect if # Find the highest common protocol version. Disconnect if
# that protocol version in unsupported. # that protocol version in unsupported.
@ -1384,8 +1432,7 @@ class LBRYElectrumX(SessionBase):
self.close_after_send = True self.close_after_send = True
raise RPCError(BAD_REQUEST, raise RPCError(BAD_REQUEST,
f'unsupported protocol version: {protocol_version}') f'unsupported protocol version: {protocol_version}')
self.set_request_handlers(ptuple) self.protocol_tuple = ptuple
return self.version, self.protocol_version_string() return self.version, self.protocol_version_string()
async def transaction_broadcast(self, raw_tx): async def transaction_broadcast(self, raw_tx):
@ -1479,45 +1526,6 @@ class LBRYElectrumX(SessionBase):
else: else:
return tx_hash return tx_hash
def set_request_handlers(self, ptuple):
self.protocol_tuple = ptuple
handlers = {
'blockchain.block.get_chunk': self.block_get_chunk,
'blockchain.block.get_header': self.block_get_header,
'blockchain.estimatefee': self.estimatefee,
'blockchain.relayfee': self.relayfee,
'blockchain.scripthash.get_balance': self.scripthash_get_balance,
'blockchain.scripthash.get_history': self.scripthash_get_history,
'blockchain.scripthash.get_mempool': self.scripthash_get_mempool,
'blockchain.scripthash.listunspent': self.scripthash_listunspent,
'blockchain.scripthash.subscribe': self.scripthash_subscribe,
'blockchain.transaction.broadcast': self.transaction_broadcast,
'blockchain.transaction.get': self.transaction_get,
'blockchain.transaction.get_merkle': self.transaction_merkle,
'server.add_peer': self.add_peer,
'server.banner': self.banner,
'server.donation_address': self.donation_address,
'server.features': self.server_features_async,
'server.peers.subscribe': self.peers_subscribe,
'server.version': self.server_version,
'blockchain.transaction.get_height': self.transaction_get_height,
'blockchain.claimtrie.search': self.claimtrie_search,
'blockchain.claimtrie.resolve': self.claimtrie_resolve,
'blockchain.claimtrie.getclaimsbyids': self.claimtrie_getclaimsbyids,
'blockchain.block.get_server_height': self.get_server_height,
'mempool.get_fee_histogram': self.mempool.compact_fee_histogram,
'blockchain.block.headers': self.block_headers,
'server.ping': self.ping,
'blockchain.headers.subscribe': self.headers_subscribe_False,
'blockchain.address.get_balance': self.address_get_balance,
'blockchain.address.get_history': self.address_get_history,
'blockchain.address.get_mempool': self.address_get_mempool,
'blockchain.address.listunspent': self.address_listunspent,
'blockchain.address.subscribe': self.address_subscribe,
'blockchain.address.unsubscribe': self.address_unsubscribe,
}
self.request_handlers = handlers
class LocalRPC(SessionBase): class LocalRPC(SessionBase):
"""A local TCP RPC server session.""" """A local TCP RPC server session."""