diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 66165740b..718e339f5 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -8,6 +8,8 @@ import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport +from prometheus_client import Gauge + from lbry.dht import constants from lbry.dht.serialization.bencoding import DecodeError from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram @@ -30,6 +32,11 @@ OLD_PROTOCOL_ERRORS = { class KademliaRPC: + stored_blobs_metric = Gauge( + "stored_blobs", "Number of blobs announced by other peers", namespace="dht_node", + labelnames=("scope",), + ) + def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333): self.protocol = protocol self.loop = loop @@ -61,6 +68,7 @@ class KademliaRPC: self.protocol.data_store.add_peer_to_blob( rpc_contact, blob_hash ) + self.stored_blobs_metric.labels("global").set(len(self.protocol.data_store)) return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 4fea1266f..1dcfc9db0 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -4,6 +4,8 @@ import logging import typing import itertools +from prometheus_client import Gauge + from lbry.dht import constants from lbry.dht.protocol.distance import Distance if typing.TYPE_CHECKING: @@ -13,8 +15,13 @@ log = logging.getLogger(__name__) class KBucket: - """ Description - later """ + Kademlia K-bucket implementation. + """ + peers_in_routing_table_metric = Gauge( + "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes): """ @@ -58,6 +65,7 @@ class KBucket: return True if len(self.peers) < constants.K: self.peers.append(peer) + self.peers_in_routing_table_metric.labels("global").inc() return True else: return False @@ -124,6 +132,7 @@ class KBucket: def remove_peer(self, peer: 'KademliaPeer') -> None: self.peers.remove(peer) + self.peers_in_routing_table_metric.labels("global").dec() def key_in_range(self, key: bytes) -> bool: """ Tests whether the specified key (i.e. node ID) is in the range @@ -162,6 +171,10 @@ class TreeRoutingTable: ping RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ + buckets_in_routing_table_metric = Gauge( + "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): @@ -279,6 +292,7 @@ class TreeRoutingTable: # ...and remove them from the old bucket for contact in new_bucket.peers: old_bucket.remove_peer(contact) + self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) def join_buckets(self): if len(self.buckets) == 1: @@ -302,6 +316,7 @@ class TreeRoutingTable: elif can_go_higher: self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets.remove(bucket) + self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) return self.join_buckets() def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 945b8988b..7343b0979 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -6,7 +6,7 @@ import os.path from io import StringIO from typing import Optional from aiohttp import web -from prometheus_client import generate_latest as prom_generate_latest, Gauge +from prometheus_client import generate_latest as prom_generate_latest from lbry.dht.constants import generate_id from lbry.dht.node import Node @@ -16,18 +16,6 @@ from lbry.conf import Config logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) -BLOBS_STORED = Gauge( - "blobs_stored", "Number of blob info received", namespace="dht_node", - labelnames=("method",) -) -PEERS = Gauge( - "known_peers", "Number of peers on routing table", namespace="dht_node", - labelnames=("method",) -) -ESTIMATED_SIZE = Gauge( - "passively_estimated_network_size", "Estimated network size from routing table", namespace="dht_node", - labelnames=("method",) -) class SimpleMetrics: @@ -129,11 +117,6 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional log.info("Peer with id %s started", node_id.hex()) while True: await asyncio.sleep(10) - PEERS.labels('main').set(len(node.protocol.routing_table.get_peers())) - peers = node.protocol.routing_table.get_peers() - close_ids = [peer for peer in peers if peer.node_id[0] == node.protocol.node_id[0]] - ESTIMATED_SIZE.labels('main').set(len(close_ids) * 256) - BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts())) log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.", len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store), len(node.protocol.data_store.get_storing_contacts()))