address_subscriptions gauge and history_size histogram metrics

This commit is contained in:
Jack Robison 2022-05-20 11:58:18 -04:00
parent e9f2b1efea
commit 4466bb1451
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 22 additions and 4 deletions

View file

@ -28,6 +28,9 @@ CLAIM_HASH_LEN = 20
HISTOGRAM_BUCKETS = ( HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
) )
SIZE_BUCKETS = (
1, 10, 100, 500, 1000, 2000, 4000, 7500, 10000, 15000, 25000, 50000, 75000, 100000, 150000, 250000, float('inf')
)
CLAIM_TYPES = { CLAIM_TYPES = {
'stream': 1, 'stream': 1,

View file

@ -21,7 +21,7 @@ from hub import __version__, PROMETHEUS_NAMESPACE
from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION
from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from hub.herald.search import SearchIndex from hub.herald.search import SearchIndex
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
@ -146,7 +146,6 @@ class SessionManager:
pending_query_metric = Gauge( pending_query_metric = Gauge(
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
) )
client_version_metric = Counter( client_version_metric = Counter(
"clients", "Number of connections received per client version", "clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",) namespace=NAMESPACE, labelnames=("version",)
@ -155,6 +154,14 @@ class SessionManager:
"address_history", "Time to fetch an address history", "address_history", "Time to fetch an address history",
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
address_subscription_metric = Gauge(
"address_subscriptions", "Number of subscribed addresses",
namespace=NAMESPACE
)
address_history_size_metric = Histogram(
"history_size", "Sizes of histories for subscribed addresses",
namespace=NAMESPACE, buckets=SIZE_BUCKETS
)
notifications_in_flight_metric = Gauge( notifications_in_flight_metric = Gauge(
"notifications_in_flight", "Count of notifications in flight", "notifications_in_flight", "Count of notifications in flight",
namespace=NAMESPACE namespace=NAMESPACE
@ -596,13 +603,16 @@ class SessionManager:
return hex_hash return hex_hash
async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]: async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]:
if hashX in self.hashX_full_cache: cached_full_history = self.hashX_full_cache.get(hashX)
return self.hashX_full_cache[hashX] if cached_full_history is not None:
self.address_history_size_metric.observe(len(cached_full_history))
return cached_full_history
if hashX not in self.hashX_history_cache: if hashX not in self.hashX_history_cache:
limit = self.env.max_send // 97 limit = self.env.max_send // 97
self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit)
else: else:
tx_nums = self.hashX_history_cache[hashX] tx_nums = self.hashX_history_cache[hashX]
self.address_history_size_metric.observe(len(tx_nums))
needed_tx_infos = [] needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append append_needed_tx_info = needed_tx_infos.append
tx_infos = {} tx_infos = {}
@ -653,6 +663,7 @@ class SessionManager:
def remove_session(self, session): def remove_session(self, session):
"""Remove a session from our sessions list if there.""" """Remove a session from our sessions list if there."""
session_id = id(session) session_id = id(session)
self.address_subscription_metric.dec(len(session.hashX_subs))
for hashX in session.hashX_subs: for hashX in session.hashX_subs:
sessions = self.hashx_subscriptions_by_session[hashX] sessions = self.hashx_subscriptions_by_session[hashX]
sessions.remove(session_id) sessions.remove(session_id)
@ -1378,6 +1389,8 @@ class LBRYElectrumX(asyncio.Protocol):
sessions.remove(id(self)) sessions.remove(id(self))
except KeyError: except KeyError:
pass pass
else:
self.session_manager.address_subscription_metric.dec()
if not sessions: if not sessions:
self.hashX_subs.pop(hashX, None) self.hashX_subs.pop(hashX, None)
@ -1415,6 +1428,7 @@ class LBRYElectrumX(asyncio.Protocol):
if len(addresses) > 1000: if len(addresses) > 1000:
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
results = [] results = []
self.session_manager.address_subscription_metric.inc(len(addresses))
for address in addresses: for address in addresses:
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
await asyncio.sleep(0) await asyncio.sleep(0)
@ -1473,6 +1487,7 @@ class LBRYElectrumX(asyncio.Protocol):
scripthash: the SHA256 hash of the script to subscribe to""" scripthash: the SHA256 hash of the script to subscribe to"""
hashX = scripthash_to_hashX(scripthash) hashX = scripthash_to_hashX(scripthash)
self.session_manager.address_subscription_metric.inc()
return await self.hashX_subscribe(hashX, scripthash) return await self.hashX_subscribe(hashX, scripthash)
async def scripthash_unsubscribe(self, scripthash: str): async def scripthash_unsubscribe(self, scripthash: str):