forked from LBRYCommunity/lbry-sdk
prometheus: move blobs_stored and peers to SDK. add buckets_in_routing_table
This commit is contained in:
parent
470ee72462
commit
2d9130b4e0
3 changed files with 25 additions and 19 deletions
|
@ -8,6 +8,8 @@ import random
|
||||||
from asyncio.protocols import DatagramProtocol
|
from asyncio.protocols import DatagramProtocol
|
||||||
from asyncio.transports import DatagramTransport
|
from asyncio.transports import DatagramTransport
|
||||||
|
|
||||||
|
from prometheus_client import Gauge
|
||||||
|
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.serialization.bencoding import DecodeError
|
from lbry.dht.serialization.bencoding import DecodeError
|
||||||
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
|
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
|
||||||
|
@ -30,6 +32,11 @@ OLD_PROTOCOL_ERRORS = {
|
||||||
|
|
||||||
|
|
||||||
class KademliaRPC:
|
class KademliaRPC:
|
||||||
|
stored_blobs_metric = Gauge(
|
||||||
|
"stored_blobs", "Number of blobs announced by other peers", namespace="dht_node",
|
||||||
|
labelnames=("scope",),
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333):
|
def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333):
|
||||||
self.protocol = protocol
|
self.protocol = protocol
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
|
@ -61,6 +68,7 @@ class KademliaRPC:
|
||||||
self.protocol.data_store.add_peer_to_blob(
|
self.protocol.data_store.add_peer_to_blob(
|
||||||
rpc_contact, blob_hash
|
rpc_contact, blob_hash
|
||||||
)
|
)
|
||||||
|
self.stored_blobs_metric.labels("global").set(len(self.protocol.data_store))
|
||||||
return b'OK'
|
return b'OK'
|
||||||
|
|
||||||
def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
|
def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
|
||||||
|
|
|
@ -4,6 +4,8 @@ import logging
|
||||||
import typing
|
import typing
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
|
from prometheus_client import Gauge
|
||||||
|
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.protocol.distance import Distance
|
from lbry.dht.protocol.distance import Distance
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
@ -13,8 +15,13 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class KBucket:
|
class KBucket:
|
||||||
""" Description - later
|
|
||||||
"""
|
"""
|
||||||
|
Kademlia K-bucket implementation.
|
||||||
|
"""
|
||||||
|
peers_in_routing_table_metric = Gauge(
|
||||||
|
"peers_in_routing_table", "Number of peers on routing table", namespace="dht_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes):
|
def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes):
|
||||||
"""
|
"""
|
||||||
|
@ -58,6 +65,7 @@ class KBucket:
|
||||||
return True
|
return True
|
||||||
if len(self.peers) < constants.K:
|
if len(self.peers) < constants.K:
|
||||||
self.peers.append(peer)
|
self.peers.append(peer)
|
||||||
|
self.peers_in_routing_table_metric.labels("global").inc()
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
@ -124,6 +132,7 @@ class KBucket:
|
||||||
|
|
||||||
def remove_peer(self, peer: 'KademliaPeer') -> None:
|
def remove_peer(self, peer: 'KademliaPeer') -> None:
|
||||||
self.peers.remove(peer)
|
self.peers.remove(peer)
|
||||||
|
self.peers_in_routing_table_metric.labels("global").dec()
|
||||||
|
|
||||||
def key_in_range(self, key: bytes) -> bool:
|
def key_in_range(self, key: bytes) -> bool:
|
||||||
""" Tests whether the specified key (i.e. node ID) is in the range
|
""" Tests whether the specified key (i.e. node ID) is in the range
|
||||||
|
@ -162,6 +171,10 @@ class TreeRoutingTable:
|
||||||
ping RPC-based k-bucket eviction algorithm described in section 2.2 of
|
ping RPC-based k-bucket eviction algorithm described in section 2.2 of
|
||||||
that paper.
|
that paper.
|
||||||
"""
|
"""
|
||||||
|
buckets_in_routing_table_metric = Gauge(
|
||||||
|
"buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
|
||||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
|
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
|
||||||
|
@ -279,6 +292,7 @@ class TreeRoutingTable:
|
||||||
# ...and remove them from the old bucket
|
# ...and remove them from the old bucket
|
||||||
for contact in new_bucket.peers:
|
for contact in new_bucket.peers:
|
||||||
old_bucket.remove_peer(contact)
|
old_bucket.remove_peer(contact)
|
||||||
|
self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets))
|
||||||
|
|
||||||
def join_buckets(self):
|
def join_buckets(self):
|
||||||
if len(self.buckets) == 1:
|
if len(self.buckets) == 1:
|
||||||
|
@ -302,6 +316,7 @@ class TreeRoutingTable:
|
||||||
elif can_go_higher:
|
elif can_go_higher:
|
||||||
self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
|
self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
|
||||||
self.buckets.remove(bucket)
|
self.buckets.remove(bucket)
|
||||||
|
self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets))
|
||||||
return self.join_buckets()
|
return self.join_buckets()
|
||||||
|
|
||||||
def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool:
|
def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool:
|
||||||
|
|
|
@ -6,7 +6,7 @@ import os.path
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from prometheus_client import generate_latest as prom_generate_latest, Gauge
|
from prometheus_client import generate_latest as prom_generate_latest
|
||||||
|
|
||||||
from lbry.dht.constants import generate_id
|
from lbry.dht.constants import generate_id
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
|
@ -16,18 +16,6 @@ from lbry.conf import Config
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
BLOBS_STORED = Gauge(
|
|
||||||
"blobs_stored", "Number of blob info received", namespace="dht_node",
|
|
||||||
labelnames=("method",)
|
|
||||||
)
|
|
||||||
PEERS = Gauge(
|
|
||||||
"known_peers", "Number of peers on routing table", namespace="dht_node",
|
|
||||||
labelnames=("method",)
|
|
||||||
)
|
|
||||||
ESTIMATED_SIZE = Gauge(
|
|
||||||
"passively_estimated_network_size", "Estimated network size from routing table", namespace="dht_node",
|
|
||||||
labelnames=("method",)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class SimpleMetrics:
|
class SimpleMetrics:
|
||||||
|
@ -129,11 +117,6 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional
|
||||||
log.info("Peer with id %s started", node_id.hex())
|
log.info("Peer with id %s started", node_id.hex())
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(10)
|
await asyncio.sleep(10)
|
||||||
PEERS.labels('main').set(len(node.protocol.routing_table.get_peers()))
|
|
||||||
peers = node.protocol.routing_table.get_peers()
|
|
||||||
close_ids = [peer for peer in peers if peer.node_id[0] == node.protocol.node_id[0]]
|
|
||||||
ESTIMATED_SIZE.labels('main').set(len(close_ids) * 256)
|
|
||||||
BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts()))
|
|
||||||
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
|
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
|
||||||
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
|
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
|
||||||
len(node.protocol.data_store.get_storing_contacts()))
|
len(node.protocol.data_store.get_storing_contacts()))
|
||||||
|
|
Loading…
Reference in a new issue