diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 718e339f5..6a13b5346 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -3,12 +3,13 @@ import socket import functools import hashlib import asyncio +import time import typing import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport -from prometheus_client import Gauge +from prometheus_client import Gauge, Counter, Histogram from lbry.dht import constants from lbry.dht.serialization.bencoding import DecodeError @@ -267,6 +268,18 @@ class PingQueue: class KademliaProtocol(DatagramProtocol): + requests_sent_metric = Counter( + "request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node", + labelnames=("method",), + ) + HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf') + ) + response_time_metric = Histogram( + "response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS, + labelnames=("method",) + ) + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): @@ -583,7 +596,10 @@ class KademliaProtocol(DatagramProtocol): self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: + self.requests_sent_metric.labels(method=request.method).inc() + start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) + self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) self.peer_manager.report_last_replied(peer.address, peer.udp_port) return response except asyncio.CancelledError: