add request received

This commit is contained in:
Victor Shyba 2021-12-13 04:29:29 -03:00
parent 645a81cec2
commit d359d25935
2 changed files with 20 additions and 10 deletions

View file

@ -33,7 +33,7 @@ OLD_PROTOCOL_ERRORS = {
class KademliaRPC: class KademliaRPC:
stored_blobs_metric = Gauge( stored_blob_metric = Gauge(
"stored_blobs", "Number of blobs announced by other peers", namespace="dht_node", "stored_blobs", "Number of blobs announced by other peers", namespace="dht_node",
labelnames=("scope",), labelnames=("scope",),
) )
@ -69,7 +69,7 @@ class KademliaRPC:
self.protocol.data_store.add_peer_to_blob( self.protocol.data_store.add_peer_to_blob(
rpc_contact, blob_hash rpc_contact, blob_hash
) )
self.stored_blobs_metric.labels("global").set(len(self.protocol.data_store)) self.stored_blob_metric.labels("global").set(len(self.protocol.data_store))
return b'OK' return b'OK'
def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
@ -268,10 +268,14 @@ class PingQueue:
class KademliaProtocol(DatagramProtocol): class KademliaProtocol(DatagramProtocol):
requests_sent_metric = Counter( request_sent_metric = Counter(
"request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node", "request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node",
labelnames=("method",), labelnames=("method",),
) )
request_success_metric = Counter(
"request_success", "Number of successful requests", namespace="dht_node",
labelnames=("method",),
)
HISTOGRAM_BUCKETS = ( HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf') .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf')
) )
@ -279,6 +283,10 @@ class KademliaProtocol(DatagramProtocol):
"response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS, "response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS,
labelnames=("method",) labelnames=("method",)
) )
received_request_metric = Counter(
"received_request", "Number of received DHT RPC requests", namespace="dht_node",
labelnames=("method",),
)
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
@ -468,6 +476,7 @@ class KademliaProtocol(DatagramProtocol):
def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram):
# This is an RPC method request # This is an RPC method request
self.received_request_metric.labels(method=request_datagram.method).inc()
self.peer_manager.report_last_requested(address[0], address[1]) self.peer_manager.report_last_requested(address[0], address[1])
try: try:
peer = self.routing_table.get_peer(request_datagram.node_id) peer = self.routing_table.get_peer(request_datagram.node_id)
@ -596,11 +605,12 @@ class KademliaProtocol(DatagramProtocol):
self._send(peer, request) self._send(peer, request)
response_fut = self.sent_messages[request.rpc_id][1] response_fut = self.sent_messages[request.rpc_id][1]
try: try:
self.requests_sent_metric.labels(method=request.method).inc() self.request_sent_metric.labels(method=request.method).inc()
start = time.perf_counter() start = time.perf_counter()
response = await asyncio.wait_for(response_fut, self.rpc_timeout) response = await asyncio.wait_for(response_fut, self.rpc_timeout)
self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start)
self.peer_manager.report_last_replied(peer.address, peer.udp_port) self.peer_manager.report_last_replied(peer.address, peer.udp_port)
self.request_success_metric.labels(method=request.method).inc()
return response return response
except asyncio.CancelledError: except asyncio.CancelledError:
if not response_fut.done(): if not response_fut.done():

View file

@ -18,7 +18,7 @@ class KBucket:
""" """
Kademlia K-bucket implementation. Kademlia K-bucket implementation.
""" """
peers_in_routing_table_metric = Gauge( peer_in_routing_table_metric = Gauge(
"peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node",
labelnames=("scope",) labelnames=("scope",)
) )
@ -65,7 +65,7 @@ class KBucket:
return True return True
if len(self.peers) < constants.K: if len(self.peers) < constants.K:
self.peers.append(peer) self.peers.append(peer)
self.peers_in_routing_table_metric.labels("global").inc() self.peer_in_routing_table_metric.labels("global").inc()
return True return True
else: else:
return False return False
@ -132,7 +132,7 @@ class KBucket:
def remove_peer(self, peer: 'KademliaPeer') -> None: def remove_peer(self, peer: 'KademliaPeer') -> None:
self.peers.remove(peer) self.peers.remove(peer)
self.peers_in_routing_table_metric.labels("global").dec() self.peer_in_routing_table_metric.labels("global").dec()
def key_in_range(self, key: bytes) -> bool: def key_in_range(self, key: bytes) -> bool:
""" Tests whether the specified key (i.e. node ID) is in the range """ Tests whether the specified key (i.e. node ID) is in the range
@ -171,7 +171,7 @@ class TreeRoutingTable:
ping RPC-based k-bucket eviction algorithm described in section 2.2 of ping RPC-based k-bucket eviction algorithm described in section 2.2 of
that paper. that paper.
""" """
buckets_in_routing_table_metric = Gauge( bucket_in_routing_table_metric = Gauge(
"buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node",
labelnames=("scope",) labelnames=("scope",)
) )
@ -292,7 +292,7 @@ class TreeRoutingTable:
# ...and remove them from the old bucket # ...and remove them from the old bucket
for contact in new_bucket.peers: for contact in new_bucket.peers:
old_bucket.remove_peer(contact) old_bucket.remove_peer(contact)
self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
def join_buckets(self): def join_buckets(self):
if len(self.buckets) == 1: if len(self.buckets) == 1:
@ -316,7 +316,7 @@ class TreeRoutingTable:
elif can_go_higher: elif can_go_higher:
self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
self.buckets.remove(bucket) self.buckets.remove(bucket)
self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
return self.join_buckets() return self.join_buckets()
def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: