forked from LBRYCommunity/lbry-sdk
Merge pull request #3500 from lbryio/fix_script
Add Prometheus metrics for DHT internals
This commit is contained in:
commit
f78e3825ca
6 changed files with 130 additions and 24 deletions
|
@ -2,6 +2,9 @@ import logging
|
|||
import asyncio
|
||||
import typing
|
||||
import socket
|
||||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from lbry.utils import resolve_host
|
||||
from lbry.dht import constants
|
||||
from lbry.dht.peer import make_kademlia_peer
|
||||
|
@ -17,6 +20,14 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class Node:
|
||||
storing_peers_metric = Gauge(
|
||||
"storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node",
|
||||
labelnames=("scope",),
|
||||
)
|
||||
stored_blob_with_x_bytes_colliding = Gauge(
|
||||
"stored_blobs_x_bytes_colliding", "Number of blobs with at least X bytes colliding with this node id prefix",
|
||||
namespace="dht_node", labelnames=("amount",)
|
||||
)
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
||||
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
|
||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
|
||||
|
@ -44,7 +55,18 @@ class Node:
|
|||
# add all peers in the routing table
|
||||
total_peers.extend(self.protocol.routing_table.get_peers())
|
||||
# add all the peers who have announced blobs to us
|
||||
total_peers.extend(self.protocol.data_store.get_storing_contacts())
|
||||
storing_peers = self.protocol.data_store.get_storing_contacts()
|
||||
self.storing_peers_metric.labels("global").set(len(storing_peers))
|
||||
total_peers.extend(storing_peers)
|
||||
|
||||
counts = {0: 0, 1: 0, 2: 0}
|
||||
node_id = self.protocol.node_id
|
||||
for blob_hash in self.protocol.data_store.keys():
|
||||
bytes_colliding = 0 if blob_hash[0] != node_id[0] else 2 if blob_hash[1] == node_id[1] else 1
|
||||
counts[bytes_colliding] += 1
|
||||
self.stored_blob_with_x_bytes_colliding.labels(amount=0).set(counts[0])
|
||||
self.stored_blob_with_x_bytes_colliding.labels(amount=1).set(counts[1])
|
||||
self.stored_blob_with_x_bytes_colliding.labels(amount=2).set(counts[2])
|
||||
|
||||
# get ids falling in the midpoint of each bucket that hasn't been recently updated
|
||||
node_ids = self.protocol.routing_table.get_refresh_list(0, True)
|
||||
|
|
|
@ -3,6 +3,9 @@ import asyncio
|
|||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from functools import lru_cache
|
||||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
|
||||
from lbry.dht import constants
|
||||
from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address
|
||||
|
@ -26,6 +29,10 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False):
|
|||
|
||||
|
||||
class PeerManager:
|
||||
peer_manager_keys_metric = Gauge(
|
||||
"peer_manager_keys", "Number of keys tracked by PeerManager dicts (sum)", namespace="dht_node",
|
||||
labelnames=("scope",)
|
||||
)
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop):
|
||||
self._loop = loop
|
||||
self._rpc_failures: typing.Dict[
|
||||
|
@ -38,6 +45,11 @@ class PeerManager:
|
|||
self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE)
|
||||
self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE)
|
||||
|
||||
def count_cache_keys(self):
|
||||
return len(self._rpc_failures) + len(self._last_replied) + len(self._last_sent) + len(
|
||||
self._last_requested) + len(self._node_id_mapping) + len(self._node_id_reverse_mapping) + len(
|
||||
self._node_tokens)
|
||||
|
||||
def reset(self):
|
||||
for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested):
|
||||
statistic.clear()
|
||||
|
@ -86,6 +98,7 @@ class PeerManager:
|
|||
self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id))
|
||||
self._node_id_mapping[(address, udp_port)] = node_id
|
||||
self._node_id_reverse_mapping[node_id] = (address, udp_port)
|
||||
self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys())
|
||||
|
||||
def prune(self): # TODO: periodically call this
|
||||
now = self._loop.time()
|
||||
|
|
|
@ -3,11 +3,14 @@ import socket
|
|||
import functools
|
||||
import hashlib
|
||||
import asyncio
|
||||
import time
|
||||
import typing
|
||||
import random
|
||||
from asyncio.protocols import DatagramProtocol
|
||||
from asyncio.transports import DatagramTransport
|
||||
|
||||
from prometheus_client import Gauge, Counter, Histogram
|
||||
|
||||
from lbry.dht import constants
|
||||
from lbry.dht.serialization.bencoding import DecodeError
|
||||
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
|
||||
|
@ -30,6 +33,11 @@ OLD_PROTOCOL_ERRORS = {
|
|||
|
||||
|
||||
class KademliaRPC:
|
||||
stored_blob_metric = Gauge(
|
||||
"stored_blobs", "Number of blobs announced by other peers", namespace="dht_node",
|
||||
labelnames=("scope",),
|
||||
)
|
||||
|
||||
def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333):
|
||||
self.protocol = protocol
|
||||
self.loop = loop
|
||||
|
@ -61,6 +69,7 @@ class KademliaRPC:
|
|||
self.protocol.data_store.add_peer_to_blob(
|
||||
rpc_contact, blob_hash
|
||||
)
|
||||
self.stored_blob_metric.labels("global").set(len(self.protocol.data_store))
|
||||
return b'OK'
|
||||
|
||||
def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
|
||||
|
@ -259,6 +268,30 @@ class PingQueue:
|
|||
|
||||
|
||||
class KademliaProtocol(DatagramProtocol):
|
||||
request_sent_metric = Counter(
|
||||
"request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node",
|
||||
labelnames=("method",),
|
||||
)
|
||||
request_success_metric = Counter(
|
||||
"request_success", "Number of successful requests", namespace="dht_node",
|
||||
labelnames=("method",),
|
||||
)
|
||||
request_error_metric = Counter(
|
||||
"request_error", "Number of errors returned from request to other peers", namespace="dht_node",
|
||||
labelnames=("method",),
|
||||
)
|
||||
HISTOGRAM_BUCKETS = (
|
||||
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf')
|
||||
)
|
||||
response_time_metric = Histogram(
|
||||
"response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS,
|
||||
labelnames=("method",)
|
||||
)
|
||||
received_request_metric = Counter(
|
||||
"received_request", "Number of received DHT RPC requests", namespace="dht_node",
|
||||
labelnames=("method",),
|
||||
)
|
||||
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
|
||||
udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
|
||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
|
||||
|
@ -447,6 +480,7 @@ class KademliaProtocol(DatagramProtocol):
|
|||
|
||||
def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram):
|
||||
# This is an RPC method request
|
||||
self.received_request_metric.labels(method=request_datagram.method).inc()
|
||||
self.peer_manager.report_last_requested(address[0], address[1])
|
||||
try:
|
||||
peer = self.routing_table.get_peer(request_datagram.node_id)
|
||||
|
@ -575,14 +609,19 @@ class KademliaProtocol(DatagramProtocol):
|
|||
self._send(peer, request)
|
||||
response_fut = self.sent_messages[request.rpc_id][1]
|
||||
try:
|
||||
self.request_sent_metric.labels(method=request.method).inc()
|
||||
start = time.perf_counter()
|
||||
response = await asyncio.wait_for(response_fut, self.rpc_timeout)
|
||||
self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start)
|
||||
self.peer_manager.report_last_replied(peer.address, peer.udp_port)
|
||||
self.request_success_metric.labels(method=request.method).inc()
|
||||
return response
|
||||
except asyncio.CancelledError:
|
||||
if not response_fut.done():
|
||||
response_fut.cancel()
|
||||
raise
|
||||
except (asyncio.TimeoutError, RemoteException):
|
||||
self.request_error_metric.labels(method=request.method).inc()
|
||||
self.peer_manager.report_failure(peer.address, peer.udp_port)
|
||||
if self.peer_manager.peer_is_good(peer) is False:
|
||||
self.remove_peer(peer)
|
||||
|
|
|
@ -4,6 +4,8 @@ import logging
|
|||
import typing
|
||||
import itertools
|
||||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from lbry.dht import constants
|
||||
from lbry.dht.protocol.distance import Distance
|
||||
if typing.TYPE_CHECKING:
|
||||
|
@ -13,8 +15,17 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class KBucket:
|
||||
""" Description - later
|
||||
"""
|
||||
Kademlia K-bucket implementation.
|
||||
"""
|
||||
peer_in_routing_table_metric = Gauge(
|
||||
"peers_in_routing_table", "Number of peers on routing table", namespace="dht_node",
|
||||
labelnames=("scope",)
|
||||
)
|
||||
peer_with_x_bit_colliding_metric = Gauge(
|
||||
"peer_x_bit_colliding", "Number of peers with at least X bits colliding with this node id",
|
||||
namespace="dht_node", labelnames=("amount",)
|
||||
)
|
||||
|
||||
def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes):
|
||||
"""
|
||||
|
@ -58,6 +69,10 @@ class KBucket:
|
|||
return True
|
||||
if len(self.peers) < constants.K:
|
||||
self.peers.append(peer)
|
||||
self.peer_in_routing_table_metric.labels("global").inc()
|
||||
if peer.node_id[0] == self._node_id[0]:
|
||||
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
|
||||
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@ -124,6 +139,10 @@ class KBucket:
|
|||
|
||||
def remove_peer(self, peer: 'KademliaPeer') -> None:
|
||||
self.peers.remove(peer)
|
||||
self.peer_in_routing_table_metric.labels("global").dec()
|
||||
if peer.node_id[0] == self._node_id[0]:
|
||||
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
|
||||
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec()
|
||||
|
||||
def key_in_range(self, key: bytes) -> bool:
|
||||
""" Tests whether the specified key (i.e. node ID) is in the range
|
||||
|
@ -162,6 +181,10 @@ class TreeRoutingTable:
|
|||
ping RPC-based k-bucket eviction algorithm described in section 2.2 of
|
||||
that paper.
|
||||
"""
|
||||
bucket_in_routing_table_metric = Gauge(
|
||||
"buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node",
|
||||
labelnames=("scope",)
|
||||
)
|
||||
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
|
||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
|
||||
|
@ -279,6 +302,7 @@ class TreeRoutingTable:
|
|||
# ...and remove them from the old bucket
|
||||
for contact in new_bucket.peers:
|
||||
old_bucket.remove_peer(contact)
|
||||
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
|
||||
|
||||
def join_buckets(self):
|
||||
if len(self.buckets) == 1:
|
||||
|
@ -302,6 +326,7 @@ class TreeRoutingTable:
|
|||
elif can_go_higher:
|
||||
self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
|
||||
self.buckets.remove(bucket)
|
||||
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
|
||||
return self.join_buckets()
|
||||
|
||||
def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool:
|
||||
|
|
|
@ -5149,10 +5149,12 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
]
|
||||
},
|
||||
"node_id": (str) the local dht node id
|
||||
"prefix_neighbors_count": (int) the amount of peers sharing the same byte prefix of the local node id
|
||||
}
|
||||
"""
|
||||
result = {
|
||||
'buckets': {}
|
||||
'buckets': {},
|
||||
'prefix_neighbors_count': 0
|
||||
}
|
||||
|
||||
for i, _ in enumerate(self.dht_node.protocol.routing_table.buckets):
|
||||
|
@ -5165,6 +5167,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
"node_id": hexlify(peer.node_id).decode(),
|
||||
}
|
||||
result['buckets'][i].append(host)
|
||||
result['prefix_neighbors_count'] += 1 if peer.node_id[0] == self.dht_node.protocol.node_id[0] else 0
|
||||
|
||||
result['node_id'] = hexlify(self.dht_node.protocol.node_id).decode()
|
||||
return result
|
||||
|
|
|
@ -2,10 +2,11 @@ import asyncio
|
|||
import argparse
|
||||
import logging
|
||||
import csv
|
||||
import os.path
|
||||
from io import StringIO
|
||||
from typing import Optional
|
||||
from aiohttp import web
|
||||
from prometheus_client import generate_latest as prom_generate_latest, Gauge
|
||||
from prometheus_client import generate_latest as prom_generate_latest
|
||||
|
||||
from lbry.dht.constants import generate_id
|
||||
from lbry.dht.node import Node
|
||||
|
@ -15,14 +16,6 @@ from lbry.conf import Config
|
|||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||
log = logging.getLogger(__name__)
|
||||
BLOBS_STORED = Gauge(
|
||||
"blobs_stored", "Number of blob info received", namespace="dht_node",
|
||||
labelnames=("method",)
|
||||
)
|
||||
PEERS = Gauge(
|
||||
"known_peers", "Number of peers on routing table", namespace="dht_node",
|
||||
labelnames=("method",)
|
||||
)
|
||||
|
||||
|
||||
class SimpleMetrics:
|
||||
|
@ -30,7 +23,7 @@ class SimpleMetrics:
|
|||
self.prometheus_port = port
|
||||
self.dht_node: Node = node
|
||||
|
||||
async def handle_metrics_get_request(self, request: web.Request):
|
||||
async def handle_metrics_get_request(self, _):
|
||||
try:
|
||||
return web.Response(
|
||||
text=prom_generate_latest().decode(),
|
||||
|
@ -40,7 +33,7 @@ class SimpleMetrics:
|
|||
log.exception('could not generate prometheus data')
|
||||
raise
|
||||
|
||||
async def handle_peers_csv(self, request: web.Request):
|
||||
async def handle_peers_csv(self, _):
|
||||
out = StringIO()
|
||||
writer = csv.DictWriter(out, fieldnames=["ip", "port", "dht_id"])
|
||||
writer.writeheader()
|
||||
|
@ -48,7 +41,7 @@ class SimpleMetrics:
|
|||
writer.writerow({"ip": peer.address, "port": peer.udp_port, "dht_id": peer.node_id.hex()})
|
||||
return web.Response(text=out.getvalue(), content_type='text/csv')
|
||||
|
||||
async def handle_blobs_csv(self, request: web.Request):
|
||||
async def handle_blobs_csv(self, _):
|
||||
out = StringIO()
|
||||
writer = csv.DictWriter(out, fieldnames=["blob_hash"])
|
||||
writer.writeheader()
|
||||
|
@ -59,17 +52,28 @@ class SimpleMetrics:
|
|||
async def start(self):
|
||||
prom_app = web.Application()
|
||||
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
||||
prom_app.router.add_get('/peers.csv', self.handle_peers_csv)
|
||||
prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv)
|
||||
if self.dht_node:
|
||||
prom_app.router.add_get('/peers.csv', self.handle_peers_csv)
|
||||
prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv)
|
||||
metrics_runner = web.AppRunner(prom_app)
|
||||
await metrics_runner.setup()
|
||||
prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port)
|
||||
await prom_site.start()
|
||||
|
||||
|
||||
async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int):
|
||||
async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int, export: bool):
|
||||
loop = asyncio.get_event_loop()
|
||||
conf = Config()
|
||||
if not db_file_path.startswith(':memory:'):
|
||||
node_id_file_path = db_file_path + 'node_id'
|
||||
if os.path.exists(node_id_file_path):
|
||||
with open(node_id_file_path, 'rb') as node_id_file:
|
||||
node_id = node_id_file.read()
|
||||
else:
|
||||
with open(node_id_file_path, 'wb') as node_id_file:
|
||||
node_id = generate_id()
|
||||
node_id_file.write(node_id)
|
||||
|
||||
storage = SQLiteStorage(conf, db_file_path, loop, loop.time)
|
||||
if bootstrap_node:
|
||||
nodes = bootstrap_node.split(':')
|
||||
|
@ -78,17 +82,16 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional
|
|||
nodes = conf.known_dht_nodes
|
||||
await storage.open()
|
||||
node = Node(
|
||||
loop, PeerManager(loop), generate_id(), port, port, 3333, None,
|
||||
loop, PeerManager(loop), node_id, port, port, 3333, None,
|
||||
storage=storage
|
||||
)
|
||||
if prometheus_port > 0:
|
||||
metrics = SimpleMetrics(prometheus_port, node)
|
||||
metrics = SimpleMetrics(prometheus_port, node if export else None)
|
||||
await metrics.start()
|
||||
node.start(host, nodes)
|
||||
log.info("Peer with id %s started", node_id.hex())
|
||||
while True:
|
||||
await asyncio.sleep(10)
|
||||
PEERS.labels('main').set(len(node.protocol.routing_table.get_peers()))
|
||||
BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts()))
|
||||
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
|
||||
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
|
||||
len(node.protocol.data_store.get_storing_contacts()))
|
||||
|
@ -103,6 +106,7 @@ if __name__ == '__main__':
|
|||
parser.add_argument("--bootstrap_node", default=None, type=str,
|
||||
help="Node to connect for bootstraping this node. Leave unset to use the default ones. "
|
||||
"Format: host:port Example: lbrynet1.lbry.com:4444")
|
||||
parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus and raw CSV metrics. 0 to disable. Default: 0")
|
||||
parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus metrics. 0 to disable. Default: 0")
|
||||
parser.add_argument("--enable_csv_export", action='store_true', help="Enable CSV endpoints on metrics server.")
|
||||
args = parser.parse_args()
|
||||
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port))
|
||||
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port, args.enable_csv_export))
|
||||
|
|
Loading…
Add table
Reference in a new issue