forked from LBRYCommunity/lbry-sdk
use a histogram for latency, remove labels
This commit is contained in:
parent
ab67f417ee
commit
0b059a5445
1 changed files with 17 additions and 23 deletions
|
@ -9,7 +9,7 @@ import typing
|
||||||
from dataclasses import dataclass, astuple, replace
|
from dataclasses import dataclass, astuple, replace
|
||||||
|
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter
|
from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter, Histogram
|
||||||
|
|
||||||
import lbry.dht.error
|
import lbry.dht.error
|
||||||
from lbry.dht.constants import generate_id
|
from lbry.dht.constants import generate_id
|
||||||
|
@ -133,43 +133,37 @@ def new_node(address="0.0.0.0", udp_port=0, node_id=None):
|
||||||
class Crawler:
|
class Crawler:
|
||||||
unique_total_hosts_metric = Gauge(
|
unique_total_hosts_metric = Gauge(
|
||||||
"unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node",
|
"unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
reachable_hosts_metric = Gauge(
|
reachable_hosts_metric = Gauge(
|
||||||
"reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node",
|
"reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
total_historic_hosts_metric = Gauge(
|
total_historic_hosts_metric = Gauge(
|
||||||
"history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node",
|
"history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
pending_check_hosts_metric = Gauge(
|
pending_check_hosts_metric = Gauge(
|
||||||
"pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node",
|
"pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
hosts_with_errors_metric = Gauge(
|
hosts_with_errors_metric = Gauge(
|
||||||
"error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node",
|
"error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
connections_found_metric = Gauge(
|
connections_found_metric = Gauge(
|
||||||
"connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node",
|
"connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node",
|
||||||
labelnames=("host", "port")
|
|
||||||
)
|
)
|
||||||
host_latency_metric = Gauge(
|
LATENCY_HISTOGRAM_BUCKETS = (
|
||||||
"host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node",
|
0., 5., 10., 15., 30., 60., 120., 180., 240., 300., 600., 1200., 1800., 4000., 6000., float('inf')
|
||||||
labelnames=("host", "port")
|
)
|
||||||
|
host_latency_metric = Histogram(
|
||||||
|
"host_latency", "Time spent on the last request, in milliseconds.", namespace="dht_crawler_node",
|
||||||
|
buckets=LATENCY_HISTOGRAM_BUCKETS
|
||||||
)
|
)
|
||||||
probed_streams_metric = Counter(
|
probed_streams_metric = Counter(
|
||||||
"probed_streams", "Amount of streams probed.", namespace="dht_crawler_node",
|
"probed_streams", "Amount of streams probed.", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
announced_streams_metric = Counter(
|
announced_streams_metric = Counter(
|
||||||
"announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node",
|
"announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
working_streams_metric = Counter(
|
working_streams_metric = Counter(
|
||||||
"working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node",
|
"working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node",
|
||||||
labelnames=("scope",)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, db_path: str, sd_hash_samples: SDHashSamples):
|
def __init__(self, db_path: str, sd_hash_samples: SDHashSamples):
|
||||||
|
@ -226,11 +220,11 @@ class Crawler:
|
||||||
log.info("Found dead peer for %s: %s:%d(%d)",
|
log.info("Found dead peer for %s: %s:%d(%d)",
|
||||||
sd_hash.hex()[:8], blob_peer.address,
|
sd_hash.hex()[:8], blob_peer.address,
|
||||||
blob_peer.udp_port or -1, blob_peer.tcp_port or -1)
|
blob_peer.udp_port or -1, blob_peer.tcp_port or -1)
|
||||||
self.probed_streams_metric.labels("global").inc()
|
self.probed_streams_metric.inc()
|
||||||
if found:
|
if found:
|
||||||
self.announced_streams_metric.labels("global").inc()
|
self.announced_streams_metric.inc()
|
||||||
if working:
|
if working:
|
||||||
self.working_streams_metric.labels("global").inc()
|
self.working_streams_metric.inc()
|
||||||
log.info("Done querying stream %s for peers. Found: %s, working: %s", sd_hash.hex()[:8], found, working)
|
log.info("Done querying stream %s for peers. Found: %s, working: %s", sd_hash.hex()[:8], found, working)
|
||||||
await asyncio.sleep(.5)
|
await asyncio.sleep(.5)
|
||||||
|
|
||||||
|
@ -293,7 +287,7 @@ class Crawler:
|
||||||
|
|
||||||
def set_latency(self, peer, latency=None):
|
def set_latency(self, peer, latency=None):
|
||||||
if latency:
|
if latency:
|
||||||
self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency)
|
self.host_latency_metric.observe(latency / 1_000_000.0)
|
||||||
db_peer = self.get_from_peer(peer)
|
db_peer = self.get_from_peer(peer)
|
||||||
if not db_peer:
|
if not db_peer:
|
||||||
return
|
return
|
||||||
|
@ -405,7 +399,7 @@ class Crawler:
|
||||||
host, port, (time.time() - start), len(peers), i)
|
host, port, (time.time() - start), len(peers), i)
|
||||||
self.add_peers(*peers)
|
self.add_peers(*peers)
|
||||||
if peers:
|
if peers:
|
||||||
self.connections_found_metric.labels(host=host, port=port).set(len(peers))
|
self.connections_found_metric.set(len(peers))
|
||||||
self.associate_peers(peer, peers)
|
self.associate_peers(peer, peers)
|
||||||
return peers
|
return peers
|
||||||
|
|
||||||
|
@ -426,11 +420,11 @@ class Crawler:
|
||||||
submit(peer)
|
submit(peer)
|
||||||
await asyncio.sleep(.05)
|
await asyncio.sleep(.05)
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count)
|
self.unique_total_hosts_metric.set(self.checked_peers_count)
|
||||||
self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count)
|
self.reachable_hosts_metric.set(self.checked_peers_count - self.unreachable_peers_count)
|
||||||
self.total_historic_hosts_metric.labels("global").set(len(self._memory_peers))
|
self.total_historic_hosts_metric.set(len(self._memory_peers))
|
||||||
self.pending_check_hosts_metric.labels("global").set(len(to_check))
|
self.pending_check_hosts_metric.set(len(to_check))
|
||||||
self.hosts_with_errors_metric.labels("global").set(self.peers_with_errors_count)
|
self.hosts_with_errors_metric.set(self.peers_with_errors_count)
|
||||||
log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue",
|
log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue",
|
||||||
self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count,
|
self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count,
|
||||||
self.peers_with_errors_count, len(to_process), len(to_check))
|
self.peers_with_errors_count, len(to_process), len(to_check))
|
||||||
|
|
Loading…
Reference in a new issue