From 7f8268703c2054c649a3c3bd8ee1d357ece72694 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 13 Feb 2022 15:33:33 -0500 Subject: [PATCH] update prometheus --- lbry/wallet/server/block_processor.py | 7 ++--- lbry/wallet/server/chain_reader.py | 34 +++++++++++++++++++-- lbry/wallet/server/db/elasticsearch/sync.py | 13 +++++++- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 0daddce4c..21f2b9b37 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -58,7 +58,6 @@ class StagedClaimtrieItem(typing.NamedTuple): ) -NAMESPACE = "wallet_server" HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) @@ -72,13 +71,13 @@ class BlockProcessor: """ block_count_metric = Gauge( - "block_count", "Number of processed blocks", namespace=NAMESPACE + "block_count", "Number of processed blocks", namespace="block_processor" ) block_update_time_metric = Histogram( - "block_time", "Block update times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + "block_time", "Block update times", namespace="block_processor", buckets=HISTOGRAM_BUCKETS ) reorg_count_metric = Gauge( - "reorg_count", "Number of reorgs", namespace=NAMESPACE + "reorg_count", "Number of reorgs", namespace="block_processor" ) def __init__(self, env: 'Env'): diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index 76d0347dc..29379099b 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -1,9 +1,9 @@ import signal import logging import asyncio -from concurrent.futures.thread import ThreadPoolExecutor import typing - +from concurrent.futures.thread import ThreadPoolExecutor +from prometheus_client import Gauge, Histogram import lbry from lbry.wallet.server.mempool import MemPool from lbry.wallet.server.db.prefixes import DBState @@ -14,7 +14,22 @@ from lbry.wallet.server.session import LBRYSessionManager from lbry.prometheus import PrometheusServer +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + + class BlockchainReader: + block_count_metric = Gauge( + "block_count", "Number of processed blocks", namespace="blockchain_reader" + ) + block_update_time_metric = Histogram( + "block_time", "Block update times", namespace="blockchain_reader", buckets=HISTOGRAM_BUCKETS + ) + reorg_count_metric = Gauge( + "reorg_count", "Number of reorgs", namespace="blockchain_reader" + ) + def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'): self.env = env self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) @@ -47,6 +62,7 @@ class BlockchainReader: self.log.warning("reorg detected, waiting until the writer has flushed the new blocks to advance") return last_height = 0 if not self.last_state else self.last_state.height + rewound = False if self.last_state: while True: if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False): @@ -55,7 +71,10 @@ class BlockchainReader: else: self.log.warning("disconnect block %i", last_height) self.unwind() + rewound = True last_height -= 1 + if rewound: + self.reorg_count_metric.inc() self.db.read_db_state() if not self.last_state or last_height < state.height: for height in range(last_height + 1, state.height + 1): @@ -63,6 +82,7 @@ class BlockchainReader: self.advance(height) self.clear_caches() self.last_state = state + self.block_count_metric.set(self.last_state.height) self.db.blocked_streams, self.db.blocked_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes( self.db.blocking_channel_hashes ) @@ -107,6 +127,16 @@ class BlockchainReader: class BlockchainReaderServer(BlockchainReader): + block_count_metric = Gauge( + "block_count", "Number of processed blocks", namespace="wallet_server" + ) + block_update_time_metric = Histogram( + "block_time", "Block update times", namespace="wallet_server", buckets=HISTOGRAM_BUCKETS + ) + reorg_count_metric = Gauge( + "reorg_count", "Number of reorgs", namespace="wallet_server" + ) + def __init__(self, env): super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') self.history_cache = {} diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 3dd92cb38..78107c789 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -8,6 +8,7 @@ import asyncio import logging from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch.helpers import async_streaming_bulk +from prometheus_client import Gauge, Histogram from lbry.schema.result import Censor from lbry.wallet.server.db.elasticsearch.search import IndexVersionMismatch @@ -15,7 +16,7 @@ from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS, INDEX_DEFA from lbry.wallet.server.db.elasticsearch.common import expand_query from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierProtocol from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT -from lbry.wallet.server.chain_reader import BlockchainReader +from lbry.wallet.server.chain_reader import BlockchainReader, HISTOGRAM_BUCKETS from lbry.wallet.server.db.revertable import RevertableOp from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db import DB_PREFIXES @@ -26,6 +27,16 @@ log = logging.getLogger() class ElasticWriter(BlockchainReader): VERSION = 1 + prometheus_namespace = "" + block_count_metric = Gauge( + "block_count", "Number of processed blocks", namespace="elastic_sync" + ) + block_update_time_metric = Histogram( + "block_time", "Block update times", namespace="elastic_sync", buckets=HISTOGRAM_BUCKETS + ) + reorg_count_metric = Gauge( + "reorg_count", "Number of reorgs", namespace="elastic_sync" + ) def __init__(self, env): super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer')