forked from LBRYCommunity/lbry-sdk
dht_crawler: serve prometheus metrics at 7070
This commit is contained in:
parent
0497698c5b
commit
cfe5c8de8a
1 changed files with 64 additions and 0 deletions
|
@ -4,6 +4,9 @@ import asyncio
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
|
from aiohttp import web
|
||||||
|
from prometheus_client import Gauge, Counter, generate_latest as prom_generate_latest
|
||||||
|
|
||||||
import lbry.dht.error
|
import lbry.dht.error
|
||||||
from lbry.dht.constants import generate_id
|
from lbry.dht.constants import generate_id
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
|
@ -71,6 +74,34 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None):
|
||||||
|
|
||||||
|
|
||||||
class Crawler:
|
class Crawler:
|
||||||
|
unique_total_hosts_metric = Gauge(
|
||||||
|
"unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
reachable_hosts_metric = Gauge(
|
||||||
|
"reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
total_historic_hosts_metric = Gauge(
|
||||||
|
"history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
pending_check_hosts_metric = Gauge(
|
||||||
|
"pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
hosts_with_errors_metric = Gauge(
|
||||||
|
"error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
connections_found_metric = Gauge(
|
||||||
|
"connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("host", "port")
|
||||||
|
)
|
||||||
|
host_latency_metric = Gauge(
|
||||||
|
"host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("host", "port")
|
||||||
|
)
|
||||||
def __init__(self, db_path: str):
|
def __init__(self, db_path: str):
|
||||||
self.node = new_node()
|
self.node = new_node()
|
||||||
self.semaphore = asyncio.Semaphore(200)
|
self.semaphore = asyncio.Semaphore(200)
|
||||||
|
@ -134,6 +165,8 @@ class Crawler:
|
||||||
return self._memory_peers.get((peer.address, peer.udp_port), None)
|
return self._memory_peers.get((peer.address, peer.udp_port), None)
|
||||||
|
|
||||||
def set_latency(self, peer, latency=None):
|
def set_latency(self, peer, latency=None):
|
||||||
|
if latency:
|
||||||
|
self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency)
|
||||||
db_peer = self.get_from_peer(peer)
|
db_peer = self.get_from_peer(peer)
|
||||||
db_peer.latency = latency
|
db_peer.latency = latency
|
||||||
if not db_peer.node_id and peer.node_id:
|
if not db_peer.node_id and peer.node_id:
|
||||||
|
@ -227,6 +260,7 @@ class Crawler:
|
||||||
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
||||||
host, port, (time.time() - start), len(peers), i)
|
host, port, (time.time() - start), len(peers), i)
|
||||||
self.add_peers(*peers)
|
self.add_peers(*peers)
|
||||||
|
self.connections_found_metric.labels(host=host, port=port).set(len(peers))
|
||||||
#self.associate_peers(this_peer_id, db_peer_ids)
|
#self.associate_peers(this_peer_id, db_peer_ids)
|
||||||
self.db.commit()
|
self.db.commit()
|
||||||
return peers
|
return peers
|
||||||
|
@ -250,6 +284,11 @@ class Crawler:
|
||||||
if len(to_process) > 100:
|
if len(to_process) > 100:
|
||||||
break
|
break
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count)
|
||||||
|
self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count)
|
||||||
|
self.total_historic_hosts_metric.labels("global").set(len(self._memory_peers))
|
||||||
|
self.pending_check_hosts_metric.labels("global").set(len(to_check))
|
||||||
|
self.hosts_with_errors_metric.labels("global").set(self.peers_with_errors_count)
|
||||||
log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue",
|
log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue",
|
||||||
self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count,
|
self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count,
|
||||||
self.peers_with_errors_count, len(to_process), len(to_check))
|
self.peers_with_errors_count, len(to_process), len(to_check))
|
||||||
|
@ -266,7 +305,32 @@ class Crawler:
|
||||||
to_check = self.get_peers_needing_check()
|
to_check = self.get_peers_needing_check()
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleMetrics:
|
||||||
|
def __init__(self, port):
|
||||||
|
self.prometheus_port = port
|
||||||
|
|
||||||
|
async def handle_metrics_get_request(self, _):
|
||||||
|
try:
|
||||||
|
return web.Response(
|
||||||
|
text=prom_generate_latest().decode(),
|
||||||
|
content_type='text/plain; version=0.0.4'
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
log.exception('could not generate prometheus data')
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
prom_app = web.Application()
|
||||||
|
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
||||||
|
metrics_runner = web.AppRunner(prom_app)
|
||||||
|
await metrics_runner.setup()
|
||||||
|
prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port)
|
||||||
|
await prom_site.start()
|
||||||
|
|
||||||
|
|
||||||
async def test():
|
async def test():
|
||||||
|
metrics = SimpleMetrics('7070')
|
||||||
|
await metrics.start()
|
||||||
crawler = Crawler("/tmp/a.db")
|
crawler = Crawler("/tmp/a.db")
|
||||||
await crawler.node.start_listening()
|
await crawler.node.start_listening()
|
||||||
conf = Config()
|
conf = Config()
|
||||||
|
|
Loading…
Reference in a new issue