From 06da53ef097ea0fa09c2756eb9162705aa289403 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Dec 2021 03:48:33 -0300 Subject: [PATCH] add storing_peers and peer_manager_keys --- lbry/dht/node.py | 11 ++++++++++- lbry/dht/peer.py | 13 +++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 300c1a774..d9217529b 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -2,6 +2,9 @@ import logging import asyncio import typing import socket + +from prometheus_client import Gauge + from lbry.utils import resolve_host from lbry.dht import constants from lbry.dht.peer import make_kademlia_peer @@ -17,6 +20,10 @@ log = logging.getLogger(__name__) class Node: + storing_peers_metric = Gauge( + "storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node", + labelnames=("scope",), + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, @@ -44,7 +51,9 @@ class Node: # add all peers in the routing table total_peers.extend(self.protocol.routing_table.get_peers()) # add all the peers who have announced blobs to us - total_peers.extend(self.protocol.data_store.get_storing_contacts()) + storing_peers = self.protocol.data_store.get_storing_contacts() + self.storing_peers_metric.labels("global").set(len(storing_peers)) + total_peers.extend(storing_peers) # get ids falling in the midpoint of each bucket that hasn't been recently updated node_ids = self.protocol.routing_table.get_refresh_list(0, True) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index ae4fe2191..9413eec94 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -3,6 +3,9 @@ import asyncio import logging from dataclasses import dataclass, field from functools import lru_cache + +from prometheus_client import Gauge + from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache from lbry.dht import constants from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address @@ -26,6 +29,10 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False): class PeerManager: + peer_manager_keys_metric = Gauge( + "peer_manager_keys", "Number of keys tracked by PeerManager dicts (sum)", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, loop: asyncio.AbstractEventLoop): self._loop = loop self._rpc_failures: typing.Dict[ @@ -38,6 +45,11 @@ class PeerManager: self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE) self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE) + def count_cache_keys(self): + return len(self._rpc_failures) + len(self._last_replied) + len(self._last_sent) + len( + self._last_requested) + len(self._node_id_mapping) + len(self._node_id_reverse_mapping) + len( + self._node_tokens) + def reset(self): for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested): statistic.clear() @@ -86,6 +98,7 @@ class PeerManager: self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id)) self._node_id_mapping[(address, udp_port)] = node_id self._node_id_reverse_mapping[node_id] = (address, udp_port) + self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys()) def prune(self): # TODO: periodically call this now = self._loop.time()