add version metrics to prometheus

This commit is contained in:
Jack Robison 2020-02-04 10:52:08 -05:00
parent 933ccf6deb
commit 28eee4da25
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 21 additions and 14 deletions

View file

@ -380,6 +380,7 @@ class RPCSession(SessionBase):
def __init__(self, *, framer=None, loop=None, connection=None): def __init__(self, *, framer=None, loop=None, connection=None):
super().__init__(framer=framer, loop=loop) super().__init__(framer=framer, loop=loop)
self.connection = connection or self.default_connection() self.connection = connection or self.default_connection()
self.client_version = 'unknown'
async def _receive_messages(self): async def _receive_messages(self):
while not self.is_closing(): while not self.is_closing():
@ -419,12 +420,18 @@ class RPCSession(SessionBase):
'internal server error') 'internal server error')
if isinstance(request, Request): if isinstance(request, Request):
message = request.send_result(result) message = request.send_result(result)
RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start) RESPONSE_TIMES.labels(
method=request.method,
version=self.client_version
).observe(time.perf_counter() - start)
if message: if message:
await self._send_message(message) await self._send_message(message)
if isinstance(result, Exception): if isinstance(result, Exception):
self._bump_errors() self._bump_errors()
REQUEST_ERRORS_COUNT.labels(method=request.method).inc() REQUEST_ERRORS_COUNT.labels(
method=request.method,
version=self.client_version
).inc()
def connection_lost(self, exc): def connection_lost(self, exc):
# Cancel pending requests and message processing # Cancel pending requests and message processing
@ -458,8 +465,8 @@ class RPCSession(SessionBase):
async def send_notification(self, method, args=()): async def send_notification(self, method, args=()):
"""Send an RPC notification over the network.""" """Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args)) message = self.connection.send_notification(Notification(method, args))
NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
await self._send_message(message) await self._send_message(message)
NOTIFICATION_COUNT.labels(method=method).inc()
def send_batch(self, raise_errors=False): def send_batch(self, raise_errors=False):
"""Return a BatchRequest. Intended to be used like so: """Return a BatchRequest. Intended to be used like so:

View file

@ -18,14 +18,13 @@ VERSION_INFO.info({
"cpu_count": os.cpu_count() "cpu_count": os.cpu_count()
}) })
SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE) SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE)
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method",)) labelnames=("method", "version"))
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method",)) RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method", "version"))
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=NAMESPACE, labelnames=("method",)) namespace=NAMESPACE, labelnames=("method", "version"))
REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE, REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE,
labelnames=("method",)) labelnames=("method", "version"))
SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
SQLITE_OPERATIONAL_ERROR_COUNT = Counter( SQLITE_OPERATIONAL_ERROR_COUNT = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE

View file

@ -326,7 +326,7 @@ class SessionManager:
return [(session.session_id, return [(session.session_id,
session.flags(), session.flags(),
session.peer_address_str(for_log=for_log), session.peer_address_str(for_log=for_log),
session.client, session.client_version,
session.protocol_version_string(), session.protocol_version_string(),
session.count_pending_items(), session.count_pending_items(),
session.txs_sent, session.txs_sent,
@ -630,7 +630,6 @@ class SessionBase(RPCSession):
self.kind = kind # 'RPC', 'TCP' etc. self.kind = kind # 'RPC', 'TCP' etc.
self.env = session_mgr.env self.env = session_mgr.env
self.coin = self.env.coin self.coin = self.env.coin
self.client = 'unknown'
self.anon_logs = self.env.anon_logs self.anon_logs = self.env.anon_logs
self.txs_sent = 0 self.txs_sent = 0
self.log_me = False self.log_me = False
@ -675,7 +674,8 @@ class SessionBase(RPCSession):
self.logger = util.ConnectionLogger(self.logger, context) self.logger = util.ConnectionLogger(self.logger, context)
self.group = self.session_mgr.add_session(self) self.group = self.session_mgr.add_session(self)
SESSIONS_COUNT.inc() SESSIONS_COUNT.inc()
self.logger.info(f'{self.kind} {self.peer_address_str()}, ' peer_addr_str = self.peer_address_str()
self.logger.info(f'{self.kind} {peer_addr_str}, '
f'{self.session_mgr.session_count():,d} total') f'{self.session_mgr.session_count():,d} total')
def connection_lost(self, exc): def connection_lost(self, exc):
@ -706,7 +706,7 @@ class SessionBase(RPCSession):
"""Handle an incoming request. ElectrumX doesn't receive """Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions. notifications from client sessions.
""" """
REQUESTS_COUNT.labels(method=request.method).inc() REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc()
if isinstance(request, Request): if isinstance(request, Request):
handler = self.request_handlers.get(request.method) handler = self.request_handlers.get(request.method)
handler = partial(handler, self) handler = partial(handler, self)
@ -1416,6 +1416,7 @@ class LBRYElectrumX(SessionBase):
client_name: a string identifying the client client_name: a string identifying the client
protocol_version: the protocol version spoken by the client protocol_version: the protocol version spoken by the client
""" """
if self.sv_seen and self.protocol_tuple >= (1, 4): if self.sv_seen and self.protocol_tuple >= (1, 4):
raise RPCError(BAD_REQUEST, f'server.version already sent') raise RPCError(BAD_REQUEST, f'server.version already sent')
self.sv_seen = True self.sv_seen = True
@ -1427,8 +1428,8 @@ class LBRYElectrumX(SessionBase):
self.close_after_send = True self.close_after_send = True
raise RPCError(BAD_REQUEST, raise RPCError(BAD_REQUEST,
f'unsupported client: {client_name}') f'unsupported client: {client_name}')
self.client = client_name[:17] self.client_version = client_name[:17]
CLIENT_VERSIONS.labels(version=str(client_name)).inc() CLIENT_VERSIONS.labels(version=self.client_version).inc()
# Find the highest common protocol version. Disconnect if # Find the highest common protocol version. Disconnect if
# that protocol version in unsupported. # that protocol version in unsupported.