forked from LBRYCommunity/lbry-sdk
add storing_peers and peer_manager_keys
This commit is contained in:
parent
692af0cc55
commit
06da53ef09
2 changed files with 23 additions and 1 deletions
|
@ -2,6 +2,9 @@ import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
|
from prometheus_client import Gauge
|
||||||
|
|
||||||
from lbry.utils import resolve_host
|
from lbry.utils import resolve_host
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.peer import make_kademlia_peer
|
from lbry.dht.peer import make_kademlia_peer
|
||||||
|
@ -17,6 +20,10 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Node:
|
class Node:
|
||||||
|
storing_peers_metric = Gauge(
|
||||||
|
"storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node",
|
||||||
|
labelnames=("scope",),
|
||||||
|
)
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
||||||
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
|
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
|
||||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
|
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
|
||||||
|
@ -44,7 +51,9 @@ class Node:
|
||||||
# add all peers in the routing table
|
# add all peers in the routing table
|
||||||
total_peers.extend(self.protocol.routing_table.get_peers())
|
total_peers.extend(self.protocol.routing_table.get_peers())
|
||||||
# add all the peers who have announced blobs to us
|
# add all the peers who have announced blobs to us
|
||||||
total_peers.extend(self.protocol.data_store.get_storing_contacts())
|
storing_peers = self.protocol.data_store.get_storing_contacts()
|
||||||
|
self.storing_peers_metric.labels("global").set(len(storing_peers))
|
||||||
|
total_peers.extend(storing_peers)
|
||||||
|
|
||||||
# get ids falling in the midpoint of each bucket that hasn't been recently updated
|
# get ids falling in the midpoint of each bucket that hasn't been recently updated
|
||||||
node_ids = self.protocol.routing_table.get_refresh_list(0, True)
|
node_ids = self.protocol.routing_table.get_refresh_list(0, True)
|
||||||
|
|
|
@ -3,6 +3,9 @@ import asyncio
|
||||||
import logging
|
import logging
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
|
from prometheus_client import Gauge
|
||||||
|
|
||||||
from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
|
from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address
|
from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address
|
||||||
|
@ -26,6 +29,10 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False):
|
||||||
|
|
||||||
|
|
||||||
class PeerManager:
|
class PeerManager:
|
||||||
|
peer_manager_keys_metric = Gauge(
|
||||||
|
"peer_manager_keys", "Number of keys tracked by PeerManager dicts (sum)", namespace="dht_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop):
|
def __init__(self, loop: asyncio.AbstractEventLoop):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._rpc_failures: typing.Dict[
|
self._rpc_failures: typing.Dict[
|
||||||
|
@ -38,6 +45,11 @@ class PeerManager:
|
||||||
self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE)
|
self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE)
|
||||||
self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE)
|
self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE)
|
||||||
|
|
||||||
|
def count_cache_keys(self):
|
||||||
|
return len(self._rpc_failures) + len(self._last_replied) + len(self._last_sent) + len(
|
||||||
|
self._last_requested) + len(self._node_id_mapping) + len(self._node_id_reverse_mapping) + len(
|
||||||
|
self._node_tokens)
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested):
|
for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested):
|
||||||
statistic.clear()
|
statistic.clear()
|
||||||
|
@ -86,6 +98,7 @@ class PeerManager:
|
||||||
self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id))
|
self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id))
|
||||||
self._node_id_mapping[(address, udp_port)] = node_id
|
self._node_id_mapping[(address, udp_port)] = node_id
|
||||||
self._node_id_reverse_mapping[node_id] = (address, udp_port)
|
self._node_id_reverse_mapping[node_id] = (address, udp_port)
|
||||||
|
self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys())
|
||||||
|
|
||||||
def prune(self): # TODO: periodically call this
|
def prune(self): # TODO: periodically call this
|
||||||
now = self._loop.time()
|
now = self._loop.time()
|
||||||
|
|
Loading…
Add table
Reference in a new issue