From 28eee4da2579274a36e84573c4995d6d463377db Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 4 Feb 2020 10:52:08 -0500 Subject: [PATCH] add version metrics to prometheus --- lbry/wallet/rpc/session.py | 13 ++++++++++--- lbry/wallet/server/prometheus.py | 9 ++++----- lbry/wallet/server/session.py | 13 +++++++------ 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index d14a8964b..425f264b1 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -380,6 +380,7 @@ class RPCSession(SessionBase): def __init__(self, *, framer=None, loop=None, connection=None): super().__init__(framer=framer, loop=loop) self.connection = connection or self.default_connection() + self.client_version = 'unknown' async def _receive_messages(self): while not self.is_closing(): @@ -419,12 +420,18 @@ class RPCSession(SessionBase): 'internal server error') if isinstance(request, Request): message = request.send_result(result) - RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start) + RESPONSE_TIMES.labels( + method=request.method, + version=self.client_version + ).observe(time.perf_counter() - start) if message: await self._send_message(message) if isinstance(result, Exception): self._bump_errors() - REQUEST_ERRORS_COUNT.labels(method=request.method).inc() + REQUEST_ERRORS_COUNT.labels( + method=request.method, + version=self.client_version + ).inc() def connection_lost(self, exc): # Cancel pending requests and message processing @@ -458,8 +465,8 @@ class RPCSession(SessionBase): async def send_notification(self, method, args=()): """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) + NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() await self._send_message(message) - NOTIFICATION_COUNT.labels(method=method).inc() def send_batch(self, raise_errors=False): """Return a BatchRequest. Intended to be used like so: diff --git a/lbry/wallet/server/prometheus.py b/lbry/wallet/server/prometheus.py index e98c920d9..7acde8b91 100644 --- a/lbry/wallet/server/prometheus.py +++ b/lbry/wallet/server/prometheus.py @@ -18,14 +18,13 @@ VERSION_INFO.info({ "cpu_count": os.cpu_count() }) SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE) - REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, - labelnames=("method",)) -RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method",)) + labelnames=("method", "version")) +RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method", "version")) NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", - namespace=NAMESPACE, labelnames=("method",)) + namespace=NAMESPACE, labelnames=("method", "version")) REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE, - labelnames=("method",)) + labelnames=("method", "version")) SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) SQLITE_OPERATIONAL_ERROR_COUNT = Counter( "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6c96a176a..01316fcfd 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -326,7 +326,7 @@ class SessionManager: return [(session.session_id, session.flags(), session.peer_address_str(for_log=for_log), - session.client, + session.client_version, session.protocol_version_string(), session.count_pending_items(), session.txs_sent, @@ -630,7 +630,6 @@ class SessionBase(RPCSession): self.kind = kind # 'RPC', 'TCP' etc. self.env = session_mgr.env self.coin = self.env.coin - self.client = 'unknown' self.anon_logs = self.env.anon_logs self.txs_sent = 0 self.log_me = False @@ -675,7 +674,8 @@ class SessionBase(RPCSession): self.logger = util.ConnectionLogger(self.logger, context) self.group = self.session_mgr.add_session(self) SESSIONS_COUNT.inc() - self.logger.info(f'{self.kind} {self.peer_address_str()}, ' + peer_addr_str = self.peer_address_str() + self.logger.info(f'{self.kind} {peer_addr_str}, ' f'{self.session_mgr.session_count():,d} total') def connection_lost(self, exc): @@ -706,7 +706,7 @@ class SessionBase(RPCSession): """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. """ - REQUESTS_COUNT.labels(method=request.method).inc() + REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc() if isinstance(request, Request): handler = self.request_handlers.get(request.method) handler = partial(handler, self) @@ -1416,6 +1416,7 @@ class LBRYElectrumX(SessionBase): client_name: a string identifying the client protocol_version: the protocol version spoken by the client """ + if self.sv_seen and self.protocol_tuple >= (1, 4): raise RPCError(BAD_REQUEST, f'server.version already sent') self.sv_seen = True @@ -1427,8 +1428,8 @@ class LBRYElectrumX(SessionBase): self.close_after_send = True raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}') - self.client = client_name[:17] - CLIENT_VERSIONS.labels(version=str(client_name)).inc() + self.client_version = client_name[:17] + CLIENT_VERSIONS.labels(version=self.client_version).inc() # Find the highest common protocol version. Disconnect if # that protocol version in unsupported.