update prometheus

This commit is contained in:
Jack Robison 2022-02-13 15:33:33 -05:00
parent 32d2208fd9
commit 7f8268703c
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 47 additions and 7 deletions

View file

@ -58,7 +58,6 @@ class StagedClaimtrieItem(typing.NamedTuple):
) )
NAMESPACE = "wallet_server"
HISTOGRAM_BUCKETS = ( HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
) )
@ -72,13 +71,13 @@ class BlockProcessor:
""" """
block_count_metric = Gauge( block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE "block_count", "Number of processed blocks", namespace="block_processor"
) )
block_update_time_metric = Histogram( block_update_time_metric = Histogram(
"block_time", "Block update times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS "block_time", "Block update times", namespace="block_processor", buckets=HISTOGRAM_BUCKETS
) )
reorg_count_metric = Gauge( reorg_count_metric = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace="block_processor"
) )
def __init__(self, env: 'Env'): def __init__(self, env: 'Env'):

View file

@ -1,9 +1,9 @@
import signal import signal
import logging import logging
import asyncio import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
import typing import typing
from concurrent.futures.thread import ThreadPoolExecutor
from prometheus_client import Gauge, Histogram
import lbry import lbry
from lbry.wallet.server.mempool import MemPool from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.db.prefixes import DBState from lbry.wallet.server.db.prefixes import DBState
@ -14,7 +14,22 @@ from lbry.wallet.server.session import LBRYSessionManager
from lbry.prometheus import PrometheusServer from lbry.prometheus import PrometheusServer
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
class BlockchainReader: class BlockchainReader:
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace="blockchain_reader"
)
block_update_time_metric = Histogram(
"block_time", "Block update times", namespace="blockchain_reader", buckets=HISTOGRAM_BUCKETS
)
reorg_count_metric = Gauge(
"reorg_count", "Number of reorgs", namespace="blockchain_reader"
)
def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'): def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'):
self.env = env self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
@ -47,6 +62,7 @@ class BlockchainReader:
self.log.warning("reorg detected, waiting until the writer has flushed the new blocks to advance") self.log.warning("reorg detected, waiting until the writer has flushed the new blocks to advance")
return return
last_height = 0 if not self.last_state else self.last_state.height last_height = 0 if not self.last_state else self.last_state.height
rewound = False
if self.last_state: if self.last_state:
while True: while True:
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False): if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
@ -55,7 +71,10 @@ class BlockchainReader:
else: else:
self.log.warning("disconnect block %i", last_height) self.log.warning("disconnect block %i", last_height)
self.unwind() self.unwind()
rewound = True
last_height -= 1 last_height -= 1
if rewound:
self.reorg_count_metric.inc()
self.db.read_db_state() self.db.read_db_state()
if not self.last_state or last_height < state.height: if not self.last_state or last_height < state.height:
for height in range(last_height + 1, state.height + 1): for height in range(last_height + 1, state.height + 1):
@ -63,6 +82,7 @@ class BlockchainReader:
self.advance(height) self.advance(height)
self.clear_caches() self.clear_caches()
self.last_state = state self.last_state = state
self.block_count_metric.set(self.last_state.height)
self.db.blocked_streams, self.db.blocked_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes( self.db.blocked_streams, self.db.blocked_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes(
self.db.blocking_channel_hashes self.db.blocking_channel_hashes
) )
@ -107,6 +127,16 @@ class BlockchainReader:
class BlockchainReaderServer(BlockchainReader): class BlockchainReaderServer(BlockchainReader):
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace="wallet_server"
)
block_update_time_metric = Histogram(
"block_time", "Block update times", namespace="wallet_server", buckets=HISTOGRAM_BUCKETS
)
reorg_count_metric = Gauge(
"reorg_count", "Number of reorgs", namespace="wallet_server"
)
def __init__(self, env): def __init__(self, env):
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.history_cache = {} self.history_cache = {}

View file

@ -8,6 +8,7 @@ import asyncio
import logging import logging
from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch import AsyncElasticsearch, NotFoundError
from elasticsearch.helpers import async_streaming_bulk from elasticsearch.helpers import async_streaming_bulk
from prometheus_client import Gauge, Histogram
from lbry.schema.result import Censor from lbry.schema.result import Censor
from lbry.wallet.server.db.elasticsearch.search import IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.search import IndexVersionMismatch
@ -15,7 +16,7 @@ from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS, INDEX_DEFA
from lbry.wallet.server.db.elasticsearch.common import expand_query from lbry.wallet.server.db.elasticsearch.common import expand_query
from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierProtocol from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierProtocol
from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
from lbry.wallet.server.chain_reader import BlockchainReader from lbry.wallet.server.chain_reader import BlockchainReader, HISTOGRAM_BUCKETS
from lbry.wallet.server.db.revertable import RevertableOp from lbry.wallet.server.db.revertable import RevertableOp
from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db.common import TrendingNotification
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
@ -26,6 +27,16 @@ log = logging.getLogger()
class ElasticWriter(BlockchainReader): class ElasticWriter(BlockchainReader):
VERSION = 1 VERSION = 1
prometheus_namespace = ""
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace="elastic_sync"
)
block_update_time_metric = Histogram(
"block_time", "Block update times", namespace="elastic_sync", buckets=HISTOGRAM_BUCKETS
)
reorg_count_metric = Gauge(
"reorg_count", "Number of reorgs", namespace="elastic_sync"
)
def __init__(self, env): def __init__(self, env):
super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer') super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer')