From a22f50aa84ce8f00e1e2a3548064dcf68db433c0 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 10 Dec 2021 03:48:33 -0300
Subject: [PATCH] add storing_peers and peer_manager_keys

---
 lbry/dht/node.py | 11 ++++++++++-
 lbry/dht/peer.py | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/lbry/dht/node.py b/lbry/dht/node.py
index 300c1a774..d9217529b 100644
--- a/lbry/dht/node.py
+++ b/lbry/dht/node.py
@@ -2,6 +2,9 @@ import logging
 import asyncio
 import typing
 import socket
+
+from prometheus_client import Gauge
+
 from lbry.utils import resolve_host
 from lbry.dht import constants
 from lbry.dht.peer import make_kademlia_peer
@@ -17,6 +20,10 @@ log = logging.getLogger(__name__)
 
 
 class Node:
+    storing_peers_metric = Gauge(
+        "storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node",
+        labelnames=("scope",),
+    )
     def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
                  internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
                  split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
@@ -44,7 +51,9 @@ class Node:
             # add all peers in the routing table
             total_peers.extend(self.protocol.routing_table.get_peers())
             # add all the peers who have announced blobs to us
-            total_peers.extend(self.protocol.data_store.get_storing_contacts())
+            storing_peers = self.protocol.data_store.get_storing_contacts()
+            self.storing_peers_metric.labels("global").set(len(storing_peers))
+            total_peers.extend(storing_peers)
 
             # get ids falling in the midpoint of each bucket that hasn't been recently updated
             node_ids = self.protocol.routing_table.get_refresh_list(0, True)
diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py
index ae4fe2191..9413eec94 100644
--- a/lbry/dht/peer.py
+++ b/lbry/dht/peer.py
@@ -3,6 +3,9 @@ import asyncio
 import logging
 from dataclasses import dataclass, field
 from functools import lru_cache
+
+from prometheus_client import Gauge
+
 from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
 from lbry.dht import constants
 from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address
@@ -26,6 +29,10 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False):
 
 
 class PeerManager:
+    peer_manager_keys_metric = Gauge(
+        "peer_manager_keys", "Number of keys tracked by PeerManager dicts (sum)", namespace="dht_node",
+        labelnames=("scope",)
+    )
     def __init__(self, loop: asyncio.AbstractEventLoop):
         self._loop = loop
         self._rpc_failures: typing.Dict[
@@ -38,6 +45,11 @@ class PeerManager:
         self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE)
         self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE)
 
+    def count_cache_keys(self):
+        return len(self._rpc_failures) + len(self._last_replied) + len(self._last_sent) + len(
+            self._last_requested) + len(self._node_id_mapping) + len(self._node_id_reverse_mapping) + len(
+            self._node_tokens)
+
     def reset(self):
         for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested):
             statistic.clear()
@@ -86,6 +98,7 @@ class PeerManager:
             self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id))
         self._node_id_mapping[(address, udp_port)] = node_id
         self._node_id_reverse_mapping[node_id] = (address, udp_port)
+        self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys())
 
     def prune(self):  # TODO: periodically call this
         now = self._loop.time()