diff --git a/hub/common.py b/hub/common.py index f077c01..4376663 100644 --- a/hub/common.py +++ b/hub/common.py @@ -28,6 +28,9 @@ CLAIM_HASH_LEN = 20 HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) +SIZE_BUCKETS = ( + 1, 10, 100, 500, 1000, 2000, 4000, 7500, 10000, 15000, 25000, 50000, 75000, 100000, 150000, 250000, float('inf') +) CLAIM_TYPES = { 'stream': 1, diff --git a/hub/herald/session.py b/hub/herald/session.py index 7060684..6c5159e 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -21,7 +21,7 @@ from hub import __version__, PROMETHEUS_NAMESPACE from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex -from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time +from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification @@ -146,7 +146,6 @@ class SessionManager: pending_query_metric = Gauge( "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE ) - client_version_metric = Counter( "clients", "Number of connections received per client version", namespace=NAMESPACE, labelnames=("version",) @@ -155,6 +154,14 @@ class SessionManager: "address_history", "Time to fetch an address history", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) + address_subscription_metric = Gauge( + "address_subscriptions", "Number of subscribed addresses", + namespace=NAMESPACE + ) + address_history_size_metric = Histogram( + "history_size", "Sizes of histories for subscribed addresses", + namespace=NAMESPACE, buckets=SIZE_BUCKETS + ) notifications_in_flight_metric = Gauge( "notifications_in_flight", "Count of notifications in flight", namespace=NAMESPACE @@ -596,13 +603,16 @@ class SessionManager: return hex_hash async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]: - if hashX in self.hashX_full_cache: - return self.hashX_full_cache[hashX] + cached_full_history = self.hashX_full_cache.get(hashX) + if cached_full_history is not None: + self.address_history_size_metric.observe(len(cached_full_history)) + return cached_full_history if hashX not in self.hashX_history_cache: limit = self.env.max_send // 97 self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) else: tx_nums = self.hashX_history_cache[hashX] + self.address_history_size_metric.observe(len(tx_nums)) needed_tx_infos = [] append_needed_tx_info = needed_tx_infos.append tx_infos = {} @@ -653,6 +663,7 @@ class SessionManager: def remove_session(self, session): """Remove a session from our sessions list if there.""" session_id = id(session) + self.address_subscription_metric.dec(len(session.hashX_subs)) for hashX in session.hashX_subs: sessions = self.hashx_subscriptions_by_session[hashX] sessions.remove(session_id) @@ -1378,6 +1389,8 @@ class LBRYElectrumX(asyncio.Protocol): sessions.remove(id(self)) except KeyError: pass + else: + self.session_manager.address_subscription_metric.dec() if not sessions: self.hashX_subs.pop(hashX, None) @@ -1415,6 +1428,7 @@ class LBRYElectrumX(asyncio.Protocol): if len(addresses) > 1000: raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') results = [] + self.session_manager.address_subscription_metric.inc(len(addresses)) for address in addresses: results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) await asyncio.sleep(0) @@ -1473,6 +1487,7 @@ class LBRYElectrumX(asyncio.Protocol): scripthash: the SHA256 hash of the script to subscribe to""" hashX = scripthash_to_hashX(scripthash) + self.session_manager.address_subscription_metric.inc() return await self.hashX_subscribe(hashX, scripthash) async def scripthash_unsubscribe(self, scripthash: str):