refactor prometheus metrics

This commit is contained in:
Jack Robison 2020-04-23 21:17:44 -04:00
parent 36c05fc4b9
commit 797364ee5c
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
9 changed files with 145 additions and 164 deletions

View file

@ -19,7 +19,7 @@ from functools import wraps, partial
import ecdsa
import base58
from aiohttp import web
from prometheus_client import generate_latest as prom_generate_latest
from prometheus_client import generate_latest as prom_generate_latest, Gauge, Histogram, Counter
from google.protobuf.message import DecodeError
from lbry.wallet import (
Wallet, ENCRYPT_ON_DISK, SingleKey, HierarchicalDeterministic,
@ -297,6 +297,20 @@ class Daemon(metaclass=JSONRPCServerType):
callable_methods: dict
deprecated_methods: dict
pending_requests_metric = Gauge(
"pending_requests", "Number of running api requests", namespace="daemon_api",
labelnames=("method",)
)
requests_count_metric = Counter(
"requests_count", "Number of requests received", namespace="daemon_api",
labelnames=("method",)
)
response_time_metric = Histogram(
"response_time", "Response times", namespace="daemon_api",
labelnames=("method",)
)
def __init__(self, conf: Config, component_manager: typing.Optional[ComponentManager] = None):
self.conf = conf
self.platform_info = system_info.get_platform()
@ -457,7 +471,6 @@ class Daemon(metaclass=JSONRPCServerType):
log.info("Starting LBRYNet Daemon")
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
log.info("Platform: %s", json.dumps(self.platform_info, indent=2))
self.need_connection_status_refresh.set()
self._connection_status_task = self.component_manager.loop.create_task(
self.keep_connection_status_up_to_date()
@ -663,7 +676,9 @@ class Daemon(metaclass=JSONRPCServerType):
JSONRPCError.CODE_INVALID_PARAMS,
params_error_message,
)
self.pending_requests_metric.labels(method=function_name).inc()
self.requests_count_metric.labels(method=function_name).inc()
start = time.perf_counter()
try:
result = method(self, *_args, **_kwargs)
if asyncio.iscoroutine(result):
@ -677,6 +692,9 @@ class Daemon(metaclass=JSONRPCServerType):
return JSONRPCError.create_command_exception(
command=function_name, args=_args, kwargs=_kwargs, exception=e, traceback=format_exc()
)
finally:
self.pending_requests_metric.labels(method=function_name).dec()
self.response_time_metric.labels(method=function_name).observe(time.perf_counter() - start)
def _verify_method_is_callable(self, function_path):
if function_path not in self.callable_methods:

View file

@ -3,6 +3,7 @@ import logging
import asyncio
import sqlite3
import platform
import time
from binascii import hexlify
from dataclasses import dataclass
from contextvars import ContextVar
@ -10,6 +11,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional
from datetime import date
from prometheus_client import Gauge
from .bip32 import PubKey
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
@ -64,6 +66,13 @@ else:
class AIOSQLite:
reader_executor: ReaderExecutorClass
waiting_writes_metric = Gauge(
"waiting_writes_count", "Number of waiting db writes", namespace="daemon_database"
)
waiting_reads_metric = Gauge(
"waiting_reads_count", "Number of waiting db writes", namespace="daemon_database"
)
def __init__(self):
# has to be single threaded as there is no mapping of thread:connection
self.writer_executor = ThreadPoolExecutor(max_workers=1)
@ -117,6 +126,7 @@ class AIOSQLite:
still_waiting = False
urgent_read = False
if read_only:
self.waiting_reads_metric.inc()
try:
while self.writers: # more writes can come in while we are waiting for the first
if not urgent_read and still_waiting and self.urgent_read_done.is_set():
@ -133,6 +143,7 @@ class AIOSQLite:
if urgent_read:
# unthrottle the writers if they had to be throttled
self.urgent_read_done.set()
self.waiting_reads_metric.dec()
if fetch_all:
return await self.run(lambda conn: conn.execute(sql, parameters).fetchall())
return await self.run(lambda conn: conn.execute(sql, parameters).fetchone())
@ -150,7 +161,12 @@ class AIOSQLite:
return self.run(lambda conn: conn.execute(sql, parameters))
async def run(self, fun, *args, **kwargs):
self.waiting_writes_metric.inc()
try:
await self.urgent_read_done.wait()
except Exception as e:
self.waiting_writes_metric.dec()
raise e
self.writers += 1
self.read_ready.clear()
try:
@ -160,6 +176,7 @@ class AIOSQLite:
)
finally:
self.writers -= 1
self.waiting_writes_metric.dec()
if not self.writers:
self.read_ready.set()

View file

@ -33,13 +33,12 @@ from asyncio import Event, CancelledError
import logging
import time
from contextlib import suppress
from prometheus_client import Counter, Histogram
from lbry.wallet.tasks import TaskGroup
from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Notification
from .jsonrpc import RPCError, ProtocolError
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
from lbry.wallet.server import prometheus
class Connector:
@ -372,10 +371,26 @@ class BatchRequest:
raise BatchError(self)
NAMESPACE = "wallet_server"
class RPCSession(SessionBase):
"""Base class for protocols where a message can lead to a response,
for example JSON RPC."""
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE,
labelnames=("method", "version"))
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=NAMESPACE, labelnames=("method", "version"))
REQUEST_ERRORS_COUNT = Counter(
"request_error", "Number of requests that returned errors", namespace=NAMESPACE,
labelnames=("method", "version")
)
RESET_CONNECTIONS = Counter(
"reset_clients", "Number of reset connections by client version",
namespace=NAMESPACE, labelnames=("version",)
)
def __init__(self, *, framer=None, loop=None, connection=None):
super().__init__(framer=framer, loop=loop)
self.connection = connection or self.default_connection()
@ -388,7 +403,7 @@ class RPCSession(SessionBase):
except MemoryError:
self.logger.warning('received oversized message from %s:%s, dropping connection',
self._address[0], self._address[1])
prometheus.METRICS.RESET_CONNECTIONS.labels(version=self.client_version).inc()
self.RESET_CONNECTIONS.labels(version=self.client_version).inc()
self._close()
return
@ -422,7 +437,7 @@ class RPCSession(SessionBase):
'internal server error')
if isinstance(request, Request):
message = request.send_result(result)
prometheus.METRICS.RESPONSE_TIMES.labels(
self.RESPONSE_TIMES.labels(
method=request.method,
version=self.client_version
).observe(time.perf_counter() - start)
@ -430,7 +445,7 @@ class RPCSession(SessionBase):
await self._send_message(message)
if isinstance(result, Exception):
self._bump_errors()
prometheus.METRICS.REQUEST_ERRORS_COUNT.labels(
self.REQUEST_ERRORS_COUNT.labels(
method=request.method,
version=self.client_version
).inc()
@ -467,7 +482,7 @@ class RPCSession(SessionBase):
async def send_notification(self, method, args=()):
"""Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args))
prometheus.METRICS.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
await self._send_message(message)
def send_batch(self, raise_errors=False):

View file

@ -3,6 +3,7 @@ import asyncio
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional
from prometheus_client import Gauge, Histogram
import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.db.writer import SQLDB
@ -10,7 +11,6 @@ from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server import prometheus
class Prefetcher:
@ -129,6 +129,9 @@ class ChainError(Exception):
"""Raised on error processing blocks."""
NAMESPACE = "wallet_server"
class BlockProcessor:
"""Process blocks and update the DB state to match.
@ -136,6 +139,14 @@ class BlockProcessor:
Coordinate backing up in case of chain reorganisations.
"""
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
block_update_time_metric = Histogram("block_time", "Block update times", namespace=NAMESPACE)
reorg_count_metric = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE
)
def __init__(self, env, db, daemon, notifications):
self.env = env
self.db = db
@ -199,8 +210,8 @@ class BlockProcessor:
cache.clear()
await self._maybe_flush()
processed_time = time.perf_counter() - start
prometheus.METRICS.BLOCK_COUNT.set(self.height)
prometheus.METRICS.BLOCK_UPDATE_TIMES.observe(processed_time)
self.block_count_metric.set(self.height)
self.block_update_time_metric.observe(processed_time)
if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
@ -255,7 +266,7 @@ class BlockProcessor:
last -= len(raw_blocks)
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
prometheus.METRICS.REORG_COUNT.inc()
self.reorg_count_metric.inc()
async def reorg_hashes(self, count):
"""Return a pair (start, last, hashes) of blocks to back up during a

View file

@ -6,11 +6,11 @@ from functools import wraps
from pylru import lrucache
import aiohttp
from prometheus_client import Gauge, Histogram
from lbry.wallet.rpc.jsonrpc import RPCError
from lbry.wallet.server.util import hex_to_bytes, class_logger
from lbry.wallet.rpc import JSONRPC
from lbry.wallet.server import prometheus
class DaemonError(Exception):
@ -25,12 +25,23 @@ class WorkQueueFullError(Exception):
"""Internal - when the daemon's work queue is full."""
NAMESPACE = "wallet_server"
class Daemon:
"""Handles connections to a daemon at the given URL."""
WARMING_UP = -28
id_counter = itertools.count()
lbrycrd_request_time_metric = Histogram(
"lbrycrd_request", "lbrycrd requests count", namespace=NAMESPACE, labelnames=("method",)
)
lbrycrd_pending_count_metric = Gauge(
"lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=NAMESPACE,
labelnames=("method",)
)
def __init__(self, coin, url, max_workqueue=10, init_retry=0.25,
max_retry=4.0):
self.coin = coin
@ -130,7 +141,7 @@ class Daemon:
while True:
try:
for method in methods:
prometheus.METRICS.LBRYCRD_PENDING_COUNT.labels(method=method).inc()
self.lbrycrd_pending_count_metric.labels(method=method).inc()
result = await self._send_data(data)
result = processor(result)
if on_good_message:
@ -155,7 +166,7 @@ class Daemon:
on_good_message = 'running normally'
finally:
for method in methods:
prometheus.METRICS.LBRYCRD_PENDING_COUNT.labels(method=method).dec()
self.lbrycrd_pending_count_metric.labels(method=method).dec()
await asyncio.sleep(retry)
retry = max(min(self.max_retry, retry * 2), self.init_retry)
@ -176,7 +187,7 @@ class Daemon:
if params:
payload['params'] = params
result = await self._send(payload, processor)
prometheus.METRICS.LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter() - start)
self.lbrycrd_request_time_metric.labels(method=method).observe(time.perf_counter() - start)
return result
async def _send_vector(self, method, params_iterable, replace_errs=False):
@ -201,7 +212,7 @@ class Daemon:
result = []
if payload:
result = await self._send(payload, processor)
prometheus.METRICS.LBRYCRD_REQUEST_TIMES.labels(method=method).observe(time.perf_counter()-start)
self.lbrycrd_request_time_metric.labels(method=method).observe(time.perf_counter() - start)
return result
async def _is_rpc_available(self, method):

View file

@ -1,125 +0,0 @@
import os
from prometheus_client import Counter, Info, Histogram, Gauge
from lbry import __version__ as version
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server import util
import lbry.wallet.server.version as wallet_server_version
class PrometheusMetrics:
VERSION_INFO: Info
SESSIONS_COUNT: Gauge
REQUESTS_COUNT: Counter
RESPONSE_TIMES: Histogram
NOTIFICATION_COUNT: Counter
REQUEST_ERRORS_COUNT: Counter
SQLITE_INTERRUPT_COUNT: Counter
SQLITE_OPERATIONAL_ERROR_COUNT: Counter
SQLITE_INTERNAL_ERROR_COUNT: Counter
SQLITE_EXECUTOR_TIMES: Histogram
SQLITE_PENDING_COUNT: Gauge
LBRYCRD_REQUEST_TIMES: Histogram
LBRYCRD_PENDING_COUNT: Gauge
CLIENT_VERSIONS: Counter
BLOCK_COUNT: Gauge
BLOCK_UPDATE_TIMES: Histogram
REORG_COUNT: Gauge
RESET_CONNECTIONS: Counter
__slots__ = [
'VERSION_INFO',
'SESSIONS_COUNT',
'REQUESTS_COUNT',
'RESPONSE_TIMES',
'NOTIFICATION_COUNT',
'REQUEST_ERRORS_COUNT',
'SQLITE_INTERRUPT_COUNT',
'SQLITE_OPERATIONAL_ERROR_COUNT',
'SQLITE_INTERNAL_ERROR_COUNT',
'SQLITE_EXECUTOR_TIMES',
'SQLITE_PENDING_COUNT',
'LBRYCRD_REQUEST_TIMES',
'LBRYCRD_PENDING_COUNT',
'CLIENT_VERSIONS',
'BLOCK_COUNT',
'BLOCK_UPDATE_TIMES',
'REORG_COUNT',
'RESET_CONNECTIONS',
'_installed',
'namespace',
'cpu_count'
]
def __init__(self):
self._installed = False
self.namespace = "wallet_server"
self.cpu_count = f"{os.cpu_count()}"
def uninstall(self):
self._installed = False
for item in self.__slots__:
if not item.startswith('_') and item not in ('namespace', 'cpu_count'):
current = getattr(self, item, None)
if current:
setattr(self, item, None)
del current
def install(self):
if self._installed:
return
self._installed = True
self.VERSION_INFO = Info('build', 'Wallet server build info (e.g. version, commit hash)', namespace=self.namespace)
self.VERSION_INFO.info({
'build': BUILD,
"commit": COMMIT_HASH,
"docker_tag": DOCKER_TAG,
'version': version,
"min_version": util.version_string(wallet_server_version.PROTOCOL_MIN),
"cpu_count": self.cpu_count
})
self.SESSIONS_COUNT = Gauge("session_count", "Number of connected client sessions", namespace=self.namespace,
labelnames=("version",))
self.REQUESTS_COUNT = Counter("requests_count", "Number of requests received", namespace=self.namespace,
labelnames=("method", "version"))
self.RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=self.namespace,
labelnames=("method", "version"))
self.NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=self.namespace, labelnames=("method", "version"))
self.REQUEST_ERRORS_COUNT = Counter("request_error", "Number of requests that returned errors", namespace=self.namespace,
labelnames=("method", "version"))
self.SQLITE_INTERRUPT_COUNT = Counter("interrupt", "Number of interrupted queries", namespace=self.namespace)
self.SQLITE_OPERATIONAL_ERROR_COUNT = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=self.namespace
)
self.SQLITE_INTERNAL_ERROR_COUNT = Counter(
"internal_error", "Number of queries raising unexpected errors", namespace=self.namespace
)
self.SQLITE_EXECUTOR_TIMES = Histogram("executor_time", "SQLite executor times", namespace=self.namespace)
self.SQLITE_PENDING_COUNT = Gauge(
"pending_queries_count", "Number of pending and running sqlite queries", namespace=self.namespace
)
self.LBRYCRD_REQUEST_TIMES = Histogram(
"lbrycrd_request", "lbrycrd requests count", namespace=self.namespace, labelnames=("method",)
)
self.LBRYCRD_PENDING_COUNT = Gauge(
"lbrycrd_pending_count", "Number of lbrycrd rpcs that are in flight", namespace=self.namespace,
labelnames=("method",)
)
self.CLIENT_VERSIONS = Counter(
"clients", "Number of connections received per client version",
namespace=self.namespace, labelnames=("version",)
)
self.BLOCK_COUNT = Gauge(
"block_count", "Number of processed blocks", namespace=self.namespace
)
self.BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=self.namespace)
self.REORG_COUNT = Gauge(
"reorg_count", "Number of reorgs", namespace=self.namespace
)
self.RESET_CONNECTIONS = Counter(
"reset_clients", "Number of reset connections by client version",
namespace=self.namespace, labelnames=("version",)
)
METRICS = PrometheusMetrics()

View file

@ -7,7 +7,6 @@ import typing
import lbry
from lbry.wallet.server.mempool import MemPool, MemPoolAPI
from lbry.prometheus import PrometheusServer
from lbry.wallet.server.prometheus import METRICS
class Notifications:
@ -93,7 +92,6 @@ class Server:
)
async def start(self):
METRICS.install()
env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {lbry.__version__}')
@ -123,7 +121,6 @@ class Server:
self.prometheus_server = None
self.shutdown_event.set()
await self.daemon.close()
METRICS.uninstall()
def run(self):
loop = asyncio.get_event_loop()

View file

@ -20,14 +20,15 @@ from functools import partial
from binascii import hexlify
from pylru import lrucache
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from prometheus_client import Counter, Info, Histogram, Gauge
import lbry
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.server import prometheus
from lbry.wallet.rpc.framing import NewlineFramer
import lbry.wallet.server.version as VERSION
@ -117,9 +118,45 @@ class SessionGroup:
self.semaphore = asyncio.Semaphore(20)
NAMESPACE = "wallet_server"
class SessionManager:
"""Holds global state about all sessions."""
version_info_metric = Info(
'build', 'Wallet server build info (e.g. version, commit hash)', namespace=NAMESPACE
)
version_info_metric.info({
'build': BUILD,
"commit": COMMIT_HASH,
"docker_tag": DOCKER_TAG,
'version': lbry.__version__,
"min_version": util.version_string(VERSION.PROTOCOL_MIN),
"cpu_count": os.cpu_count()
})
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",))
request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method", "version"))
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
)
db_error_metric = Counter(
"internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE
)
executor_time_metric = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE)
pending_query_metric = Gauge(
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
)
client_version_metric = Counter(
"clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",)
)
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send)
@ -675,7 +712,7 @@ class SessionBase(RPCSession):
context = {'conn_id': f'{self.session_id}'}
self.logger = util.ConnectionLogger(self.logger, context)
self.group = self.session_mgr.add_session(self)
prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).inc()
self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
peer_addr_str = self.peer_address_str()
self.logger.info(f'{self.kind} {peer_addr_str}, '
f'{self.session_mgr.session_count():,d} total')
@ -684,7 +721,7 @@ class SessionBase(RPCSession):
"""Handle client disconnection."""
super().connection_lost(exc)
self.session_mgr.remove_session(self)
prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).dec()
self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
msg = ''
if not self._can_send.is_set():
msg += ' whilst paused'
@ -708,7 +745,7 @@ class SessionBase(RPCSession):
"""Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions.
"""
prometheus.METRICS.REQUESTS_COUNT.labels(method=request.method, version=self.client_version).inc()
self.session_mgr.request_count_metric.labels(method=request.method, version=self.client_version).inc()
if isinstance(request, Request):
handler = self.request_handlers.get(request.method)
handler = partial(handler, self)
@ -944,7 +981,7 @@ class LBRYElectrumX(SessionBase):
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
prometheus.METRICS.SQLITE_PENDING_COUNT.inc()
self.session_mgr.pending_query_metric.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
@ -953,18 +990,18 @@ class LBRYElectrumX(SessionBase):
except reader.SQLiteInterruptedError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_interrupt(start, error.metrics)
prometheus.METRICS.prometheus.METRICS.SQLITE_INTERRUPT_COUNT.inc()
self.session_mgr.self.session_mgr.SQLITE_INTERRUPT_COUNT.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
except reader.SQLiteOperationalError as error:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, error.metrics)
prometheus.METRICS.SQLITE_OPERATIONAL_ERROR_COUNT.inc()
self.session_mgr.db_operational_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute')
except Exception:
log.exception("dear devs, please handle this exception better")
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, {})
prometheus.METRICS.SQLITE_INTERNAL_ERROR_COUNT.inc()
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
if self.env.track_metrics:
@ -973,8 +1010,8 @@ class LBRYElectrumX(SessionBase):
metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode()
finally:
prometheus.METRICS.SQLITE_PENDING_COUNT.dec()
prometheus.METRICS.SQLITE_EXECUTOR_TIMES.observe(time.perf_counter() - start)
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def run_and_cache_query(self, query_name, function, kwargs):
metrics = self.get_metrics_or_placeholder_for_api(query_name)
@ -1441,10 +1478,10 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST,
f'unsupported client: {client_name}')
if self.client_version != client_name[:17]:
prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).dec()
self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
self.client_version = client_name[:17]
prometheus.METRICS.SESSIONS_COUNT.labels(version=self.client_version).inc()
prometheus.METRICS.CLIENT_VERSIONS.labels(version=self.client_version).inc()
self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
self.session_mgr.client_version_metric.labels(version=self.client_version).inc()
# Find the highest common protocol version. Disconnect if
# that protocol version in unsupported.

View file

@ -2,7 +2,6 @@ import logging
import asyncio
from binascii import hexlify
from lbry.testcase import CommandTestCase
from lbry.wallet.server import prometheus
class BlockchainReorganizationTests(CommandTestCase):
@ -16,7 +15,8 @@ class BlockchainReorganizationTests(CommandTestCase):
)
async def test_reorg(self):
prometheus.METRICS.REORG_COUNT.set(0)
bp = self.conductor.spv_node.server.bp
bp.reorg_count_metric.set(0)
# invalidate current block, move forward 2
self.assertEqual(self.ledger.headers.height, 206)
await self.assertBlockHash(206)
@ -26,7 +26,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(self.ledger.headers.height, 207)
await self.assertBlockHash(206)
await self.assertBlockHash(207)
self.assertEqual(1, prometheus.METRICS.REORG_COUNT._samples()[0][2])
self.assertEqual(1, bp.reorg_count_metric._samples()[0][2])
# invalidate current block, move forward 3
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
@ -36,7 +36,7 @@ class BlockchainReorganizationTests(CommandTestCase):
await self.assertBlockHash(206)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
self.assertEqual(2, prometheus.METRICS.REORG_COUNT._samples()[0][2])
self.assertEqual(2, bp.reorg_count_metric._samples()[0][2])
async def test_reorg_change_claim_height(self):
# sanity check