Merge pull request #2764 from lbryio/version-analytics
Add version tracking to prometheus metrics
This commit is contained in:
commit
ef36e0311e
4 changed files with 41 additions and 22 deletions
|
@ -380,6 +380,7 @@ class RPCSession(SessionBase):
|
||||||
def __init__(self, *, framer=None, loop=None, connection=None):
|
def __init__(self, *, framer=None, loop=None, connection=None):
|
||||||
super().__init__(framer=framer, loop=loop)
|
super().__init__(framer=framer, loop=loop)
|
||||||
self.connection = connection or self.default_connection()
|
self.connection = connection or self.default_connection()
|
||||||
|
self.client_version = 'unknown'
|
||||||
|
|
||||||
async def _receive_messages(self):
|
async def _receive_messages(self):
|
||||||
while not self.is_closing():
|
while not self.is_closing():
|
||||||
|
@ -419,12 +420,18 @@ class RPCSession(SessionBase):
|
||||||
'internal server error')
|
'internal server error')
|
||||||
if isinstance(request, Request):
|
if isinstance(request, Request):
|
||||||
message = request.send_result(result)
|
message = request.send_result(result)
|
||||||
RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start)
|
RESPONSE_TIMES.labels(
|
||||||
|
method=request.method,
|
||||||
|
version=self.client_version
|
||||||
|
).observe(time.perf_counter() - start)
|
||||||
if message:
|
if message:
|
||||||
await self._send_message(message)
|
await self._send_message(message)
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
self._bump_errors()
|
self._bump_errors()
|
||||||
REQUEST_ERRORS_COUNT.labels(method=request.method).inc()
|
REQUEST_ERRORS_COUNT.labels(
|
||||||
|
method=request.method,
|
||||||
|
version=self.client_version
|
||||||
|
).inc()
|
||||||
|
|
||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
# Cancel pending requests and message processing
|
# Cancel pending requests and message processing
|
||||||
|
@ -458,8 +465,8 @@ class RPCSession(SessionBase):
|
||||||
async def send_notification(self, method, args=()):
|
async def send_notification(self, method, args=()):
|
||||||
"""Send an RPC notification over the network."""
|
"""Send an RPC notification over the network."""
|
||||||
message = self.connection.send_notification(Notification(method, args))
|
message = self.connection.send_notification(Notification(method, args))
|
||||||
|
NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
|
||||||
await self._send_message(message)
|
await self._send_message(message)
|
||||||
NOTIFICATION_COUNT.labels(method=method).inc()
|
|
||||||
|
|
||||||
def send_batch(self, raise_errors=False):
|
def send_batch(self, raise_errors=False):
|
||||||
"""Return a BatchRequest. Intended to be used like so:
|
"""Return a BatchRequest. Intended to be used like so:
|
||||||
|
|
|
@ -9,6 +9,7 @@ from lbry.wallet.server.daemon import DaemonError
|
||||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||||
from lbry.wallet.server.util import chunks, class_logger
|
from lbry.wallet.server.util import chunks, class_logger
|
||||||
from lbry.wallet.server.leveldb import FlushData
|
from lbry.wallet.server.leveldb import FlushData
|
||||||
|
from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES
|
||||||
|
|
||||||
|
|
||||||
class Prefetcher:
|
class Prefetcher:
|
||||||
|
@ -188,14 +189,15 @@ class BlockProcessor:
|
||||||
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
|
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
|
||||||
|
|
||||||
if hprevs == chain:
|
if hprevs == chain:
|
||||||
start = time.time()
|
start = time.perf_counter()
|
||||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||||
await self._maybe_flush()
|
await self._maybe_flush()
|
||||||
|
processed_time = time.perf_counter() - start
|
||||||
|
BLOCK_COUNT.set(self.height)
|
||||||
|
BLOCK_UPDATE_TIMES.observe(processed_time)
|
||||||
if not self.db.first_sync:
|
if not self.db.first_sync:
|
||||||
s = '' if len(blocks) == 1 else 's'
|
s = '' if len(blocks) == 1 else 's'
|
||||||
self.logger.info('processed {:,d} block{} in {:.1f}s'
|
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
|
||||||
.format(len(blocks), s,
|
|
||||||
time.time() - start))
|
|
||||||
if self._caught_up_event.is_set():
|
if self._caught_up_event.is_set():
|
||||||
await self.notifications.on_block(self.touched, self.height)
|
await self.notifications.on_block(self.touched, self.height)
|
||||||
self.touched = set()
|
self.touched = set()
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import os
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from prometheus_client import Counter, Info, generate_latest as prom_generate_latest, Histogram, Gauge
|
from prometheus_client import Counter, Info, generate_latest as prom_generate_latest, Histogram, Gauge
|
||||||
from lbry import __version__ as version
|
from lbry import __version__ as version
|
||||||
|
@ -6,7 +7,7 @@ from lbry.wallet.server import util
|
||||||
import lbry.wallet.server.version as wallet_server_version
|
import lbry.wallet.server.version as wallet_server_version
|
||||||
|
|
||||||
NAMESPACE = "wallet_server"
|
NAMESPACE = "wallet_server"
|
||||||
|
CPU_COUNT = f"{os.cpu_count()}"
|
||||||
VERSION_INFO = Info('build', 'Wallet server build info (e.g. version, commit hash)', namespace=NAMESPACE)
|
VERSION_INFO = Info('build', 'Wallet server build info (e.g. version, commit hash)', namespace=NAMESPACE)
|
||||||
VERSION_INFO.info({
|
VERSION_INFO.info({
|
||||||
'build': BUILD,
|
'build': BUILD,
|
||||||
|
@ -14,16 +15,17 @@ VERSION_INFO.info({
|
||||||
"docker_tag": DOCKER_TAG,
|
"docker_tag": DOCKER_TAG,
|
||||||
'version': version,
|
'version': version,
|
||||||
"min_version": util.version_string(wallet_server_version.PROTOCOL_MIN),
|
"min_version": util.version_string(wallet_server_version.PROTOCOL_MIN),
|
||||||
|
"cpu_count": CPU_COUNT
|
||||||
})
|
})
|
||||||
SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE)
|
SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
|
||||||
|
labelnames=("version", ))
|
||||||
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
|
REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
|
||||||
labelnames=("method",))
|
labelnames=("method", "version"))
|
||||||
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method",))
|
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method", "version"))
|
||||||
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
|
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
|
||||||
namespace=NAMESPACE, labelnames=("method",))
|
namespace=NAMESPACE, labelnames=("method", "version"))
|
||||||
REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE,
|
REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=NAMESPACE,
|
||||||
labelnames=("method",))
|
labelnames=("method", "version"))
|
||||||
SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
||||||
SQLITE_OPERATIONAL_ERROR_COUNT = Counter(
|
SQLITE_OPERATIONAL_ERROR_COUNT = Counter(
|
||||||
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
|
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
|
||||||
|
@ -45,6 +47,10 @@ CLIENT_VERSIONS = Counter(
|
||||||
"clients", "Number of connections received per client version",
|
"clients", "Number of connections received per client version",
|
||||||
namespace=NAMESPACE, labelnames=("version",)
|
namespace=NAMESPACE, labelnames=("version",)
|
||||||
)
|
)
|
||||||
|
BLOCK_COUNT = Gauge(
|
||||||
|
"block_count", "Number of processed blocks", namespace=NAMESPACE
|
||||||
|
)
|
||||||
|
BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=NAMESPACE)
|
||||||
|
|
||||||
|
|
||||||
class PrometheusServer:
|
class PrometheusServer:
|
||||||
|
|
|
@ -326,7 +326,7 @@ class SessionManager:
|
||||||
return [(session.session_id,
|
return [(session.session_id,
|
||||||
session.flags(),
|
session.flags(),
|
||||||
session.peer_address_str(for_log=for_log),
|
session.peer_address_str(for_log=for_log),
|
||||||
session.client,
|
session.client_version,
|
||||||
session.protocol_version_string(),
|
session.protocol_version_string(),
|
||||||
session.count_pending_items(),
|
session.count_pending_items(),
|
||||||
session.txs_sent,
|
session.txs_sent,
|
||||||
|
@ -630,7 +630,6 @@ class SessionBase(RPCSession):
|
||||||
self.kind = kind # 'RPC', 'TCP' etc.
|
self.kind = kind # 'RPC', 'TCP' etc.
|
||||||
self.env = session_mgr.env
|
self.env = session_mgr.env
|
||||||
self.coin = self.env.coin
|
self.coin = self.env.coin
|
||||||
self.client = 'unknown'
|
|
||||||
self.anon_logs = self.env.anon_logs
|
self.anon_logs = self.env.anon_logs
|
||||||
self.txs_sent = 0
|
self.txs_sent = 0
|
||||||
self.log_me = False
|
self.log_me = False
|
||||||
|
@ -674,15 +673,16 @@ class SessionBase(RPCSession):
|
||||||
context = {'conn_id': f'{self.session_id}'}
|
context = {'conn_id': f'{self.session_id}'}
|
||||||
self.logger = util.ConnectionLogger(self.logger, context)
|
self.logger = util.ConnectionLogger(self.logger, context)
|
||||||
self.group = self.session_mgr.add_session(self)
|
self.group = self.session_mgr.add_session(self)
|
||||||
SESSIONS_COUNT.inc()
|
SESSIONS_COUNT.labels(version=self.client_version).inc()
|
||||||
self.logger.info(f'{self.kind} {self.peer_address_str()}, '
|
peer_addr_str = self.peer_address_str()
|
||||||
|
self.logger.info(f'{self.kind} {peer_addr_str}, '
|
||||||
f'{self.session_mgr.session_count():,d} total')
|
f'{self.session_mgr.session_count():,d} total')
|
||||||
|
|
||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
"""Handle client disconnection."""
|
"""Handle client disconnection."""
|
||||||
super().connection_lost(exc)
|
super().connection_lost(exc)
|
||||||
self.session_mgr.remove_session(self)
|
self.session_mgr.remove_session(self)
|
||||||
SESSIONS_COUNT.dec()
|
SESSIONS_COUNT.labels(version=self.client_version).dec()
|
||||||
msg = ''
|
msg = ''
|
||||||
if not self._can_send.is_set():
|
if not self._can_send.is_set():
|
||||||
msg += ' whilst paused'
|
msg += ' whilst paused'
|
||||||
|
@ -706,7 +706,7 @@ class SessionBase(RPCSession):
|
||||||
"""Handle an incoming request. ElectrumX doesn't receive
|
"""Handle an incoming request. ElectrumX doesn't receive
|
||||||
notifications from client sessions.
|
notifications from client sessions.
|
||||||
"""
|
"""
|
||||||
REQUESTS_COUNT.labels(method=request.method).inc()
|
REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc()
|
||||||
if isinstance(request, Request):
|
if isinstance(request, Request):
|
||||||
handler = self.request_handlers.get(request.method)
|
handler = self.request_handlers.get(request.method)
|
||||||
handler = partial(handler, self)
|
handler = partial(handler, self)
|
||||||
|
@ -1416,6 +1416,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
client_name: a string identifying the client
|
client_name: a string identifying the client
|
||||||
protocol_version: the protocol version spoken by the client
|
protocol_version: the protocol version spoken by the client
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.sv_seen and self.protocol_tuple >= (1, 4):
|
if self.sv_seen and self.protocol_tuple >= (1, 4):
|
||||||
raise RPCError(BAD_REQUEST, f'server.version already sent')
|
raise RPCError(BAD_REQUEST, f'server.version already sent')
|
||||||
self.sv_seen = True
|
self.sv_seen = True
|
||||||
|
@ -1427,8 +1428,11 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.close_after_send = True
|
self.close_after_send = True
|
||||||
raise RPCError(BAD_REQUEST,
|
raise RPCError(BAD_REQUEST,
|
||||||
f'unsupported client: {client_name}')
|
f'unsupported client: {client_name}')
|
||||||
self.client = client_name[:17]
|
if self.client_version != client_name[:17]:
|
||||||
CLIENT_VERSIONS.labels(version=str(client_name)).inc()
|
SESSIONS_COUNT.labels(version=self.client_version).dec()
|
||||||
|
self.client_version = client_name[:17]
|
||||||
|
SESSIONS_COUNT.labels(version=self.client_version).inc()
|
||||||
|
CLIENT_VERSIONS.labels(version=self.client_version).inc()
|
||||||
|
|
||||||
# Find the highest common protocol version. Disconnect if
|
# Find the highest common protocol version. Disconnect if
|
||||||
# that protocol version in unsupported.
|
# that protocol version in unsupported.
|
||||||
|
|
Loading…
Add table
Reference in a new issue