add wallet server prometheus metrics

-sessions gauge
-requests counter
-response times histogram
-notification counter
-request errors counter
-interrupt counter
-operational error counter
-internal error counter
-reader executor times histogram
-pending queries gauge
-lbrycrd request counter
-client versions counter
This commit is contained in:
Jack Robison 2020-01-28 12:24:57 -05:00
parent 3b2d635390
commit 1c474352fe
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 78 additions and 15 deletions

View file

@ -40,6 +40,7 @@ from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Noti
from .jsonrpc import RPCError, ProtocolError
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
from .util import Concurrency
from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT
class Connector:
@ -405,6 +406,7 @@ class RPCSession(SessionBase):
await self._task_group.add(self._handle_request(request))
async def _handle_request(self, request):
start = time.perf_counter()
try:
result = await self.handle_request(request)
except (ProtocolError, RPCError) as e:
@ -417,10 +419,12 @@ class RPCSession(SessionBase):
'internal server error')
if isinstance(request, Request):
message = request.send_result(result)
RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start)
if message:
await self._send_message(message)
if isinstance(result, Exception):
self._bump_errors()
REQUEST_ERRORS_COUNT.labels(method=request.method).inc()
def connection_lost(self, exc):
# Cancel pending requests and message processing
@ -455,6 +459,7 @@ class RPCSession(SessionBase):
"""Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args))
await self._send_message(message)
NOTIFICATION_COUNT.labels(method=method).inc()
def send_batch(self, raise_errors=False):
"""Return a BatchRequest. Intended to be used like so:

View file

@ -9,7 +9,7 @@ import aiohttp
from lbry.wallet.rpc.jsonrpc import RPCError
from lbry.wallet.server.util import hex_to_bytes, class_logger
from lbry.wallet.rpc import JSONRPC
from lbry.wallet.server.prometheus import LBRYCRD_REQUEST_TIMES, LBRYCRD_PENDING_COUNT
class DaemonError(Exception):
"""Raised when the daemon returns an error in its results."""
@ -106,6 +106,7 @@ class Daemon:
Handles temporary connection issues. Daemon response errors
are raise through DaemonError.
"""
def log_error(error):
nonlocal last_error_log, retry
now = time.time()
@ -119,8 +120,13 @@ class Daemon:
last_error_log = 0
data = json.dumps(payload)
retry = self.init_retry
methods = tuple(
[payload['method']] if isinstance(payload, dict) else [request['method'] for request in payload]
)
while True:
try:
for method in methods:
LBRYCRD_PENDING_COUNT.labels(method=method).inc()
result = await self._send_data(data)
result = processor(result)
if on_good_message:
@ -143,12 +149,17 @@ class Daemon:
except WorkQueueFullError:
log_error('work queue full.')
on_good_message = 'running normally'
finally:
for method in methods:
LBRYCRD_PENDING_COUNT.labels(method=method).dec()
await asyncio.sleep(retry)
retry = max(min(self.max_retry, retry * 2), self.init_retry)
async def _send_single(self, method, params=None):
"""Send a single request to the daemon."""
start = time.perf_counter()
def processor(result):
err = result['error']
if not err:
@ -160,7 +171,9 @@ class Daemon:
payload = {'method': method, 'id': next(self.id_counter)}
if params:
payload['params'] = params
return await self._send(payload, processor)
result = await self._send(payload, processor)
LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start)
return result
async def _send_vector(self, method, params_iterable, replace_errs=False):
"""Send several requests of the same method.
@ -168,6 +181,9 @@ class Daemon:
The result will be an array of the same length as params_iterable.
If replace_errs is true, any item with an error is returned as None,
otherwise an exception is raised."""
start = time.perf_counter()
def processor(result):
errs = [item['error'] for item in result if item['error']]
if any(err.get('code') == self.WARMING_UP for err in errs):
@ -178,9 +194,11 @@ class Daemon:
payload = [{'method': method, 'params': p, 'id': next(self.id_counter)}
for p in params_iterable]
result = []
if payload:
return await self._send(payload, processor)
return []
result = await self._send(payload, processor)
LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start)
return result
async def _is_rpc_available(self, method):
"""Return whether given RPC method is available in the daemon.

View file

@ -1,5 +1,5 @@
from aiohttp import web
from prometheus_client import Counter, Info, generate_latest as prom_generate_latest
from prometheus_client import Counter, Info, generate_latest as prom_generate_latest, Histogram, Gauge
from lbry import __version__ as version
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server import util
@ -15,7 +15,36 @@ VERSION_INFO.info({
'version': version,
"min_version": util.version_string(wallet_server_version.PROTOCOL_MIN),
})
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, labelnames=("method",))
SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE)
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method",))
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method",))
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=NAMESPACE, labelnames=("method",))
REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE,
labelnames=("method",))
SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
SQLITE_OPERATIONAL_ERROR_COUNT = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
)
SQLITE_INTERNAL_ERROR_COUNT = Counter(
"internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE
)
SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE)
SQLITE_PENDING_COUNT = Gauge(
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
)
LBRYCRD_REQUEST_TIMES = Histogram(
"lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",)
)
LBRYCRD_PENDING_COUNT = Gauge(
"lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE, labelnames=("method",)
)
CLIENT_VERSIONS = Counter(
"clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",)
)
class PrometheusServer:

View file

@ -27,7 +27,9 @@ from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.server.prometheus import REQUESTS_COUNT
from lbry.wallet.server.prometheus import REQUESTS_COUNT, SQLITE_INTERRUPT_COUNT, SQLITE_INTERNAL_ERROR_COUNT
from lbry.wallet.server.prometheus import SQLITE_OPERATIONAL_ERROR_COUNT, SQLITE_EXECUTOR_TIMES, SESSIONS_COUNT
from lbry.wallet.server.prometheus import SQLITE_PENDING_COUNT, CLIENT_VERSIONS
import lbry.wallet.server.version as VERSION
from lbry.wallet.rpc import (
@ -679,6 +681,7 @@ class SessionBase(RPCSession):
context = {'conn_id': f'{self.session_id}'}
self.logger = util.ConnectionLogger(self.logger, context)
self.group = self.session_mgr.add_session(self)
SESSIONS_COUNT.inc()
self.logger.info(f'{self.kind} {self.peer_address_str()}, '
f'{self.session_mgr.session_count():,d} total')
@ -686,6 +689,7 @@ class SessionBase(RPCSession):
"""Handle client disconnection."""
super().connection_lost(exc)
self.session_mgr.remove_session(self)
SESSIONS_COUNT.dec()
msg = ''
if not self._can_send.is_set():
msg += ' whilst paused'
@ -888,6 +892,7 @@ class LBRYElectrumX(SessionBase):
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
SQLITE_PENDING_COUNT.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
@ -896,23 +901,28 @@ class LBRYElectrumX(SessionBase):
except reader.SQLiteInterruptedError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_interrupt(start, error.metrics)
SQLITE_INTERRUPT_COUNT.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
except reader.SQLiteOperationalError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, error.metrics)
SQLITE_OPERATIONAL_ERROR_COUNT.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
except Exception:
log.exception("dear devs, please handle this exception better")
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, {})
SQLITE_INTERNAL_ERROR_COUNT.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
if self.env.track_metrics:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
(result, metrics_data) = result
metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode()
else:
if self.env.track_metrics:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
(result, metrics_data) = result
metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode()
finally:
SQLITE_PENDING_COUNT.dec()
SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start)
async def run_and_cache_query(self, query_name, function, kwargs):
metrics = self.get_metrics_or_placeholder_for_api(query_name)
@ -1370,6 +1380,7 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST,
f'unsupported client: {client_name}')
self.client = client_name[:17]
CLIENT_VERSIONS.labels(version=str(client_name)).inc()
# Find the highest common protocol version. Disconnect if
# that protocol version in unsupported.