dht crawler:improve logging, metrics, make startup concurrent

This commit is contained in:
Victor Shyba 2022-07-23 14:58:02 -03:00 committed by Victor Shyba
parent 9aa9ecdc0a
commit df77392fe0

View file

@ -158,15 +158,15 @@ class Crawler:
) )
probed_streams_metric = Counter( probed_streams_metric = Counter(
"probed_streams", "Amount of streams probed.", namespace="dht_crawler_node", "probed_streams", "Amount of streams probed.", namespace="dht_crawler_node",
labelnames=("sd_hash",) labelnames=("scope",)
) )
announced_streams_metric = Counter( announced_streams_metric = Counter(
"announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node", "announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node",
labelnames=("sd_hash",) labelnames=("scope",)
) )
working_streams_metric = Counter( working_streams_metric = Counter(
"working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node", "working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node",
labelnames=("sd_hash",) labelnames=("scope",)
) )
def __init__(self, db_path: str, sd_hash_samples: SDHashSamples): def __init__(self, db_path: str, sd_hash_samples: SDHashSamples):
@ -195,7 +195,7 @@ class Crawler:
while True: while True:
for sd_hash in self.sd_hashes.read_samples(10_000): for sd_hash in self.sd_hashes.read_samples(10_000):
self.refresh_reachable_set() self.refresh_reachable_set()
log.info("Querying stream %s for peers.", sd_hash[:8]) log.info("Querying stream %s for peers.", sd_hash.hex()[:8])
distance = Distance(sd_hash) distance = Distance(sd_hash)
node_ids = list(self._reachable_by_node_id.keys()) node_ids = list(self._reachable_by_node_id.keys())
node_ids.sort(key=lambda node_id: distance(node_id)) node_ids.sort(key=lambda node_id: distance(node_id))
@ -203,20 +203,20 @@ class Crawler:
for response in asyncio.as_completed( for response in asyncio.as_completed(
[self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]):
response = await response response = await response
self.probed_streams_metric.labels(sd_hash).inc() self.probed_streams_metric.labels("global").inc()
if response and response.found: if response and response.found:
self.announced_streams_metric.labels(sd_hash).inc() self.announced_streams_metric.labels("global").inc()
blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) blob_peers = [decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses] for compact_addr in response.found_compact_addresses]
for blob_peer in blob_peers: for blob_peer in blob_peers:
response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash)
if response: if response:
self.working_streams_metric.labels(sd_hash).inc() self.working_streams_metric.labels("global").inc()
log.info("Found responsive peer for %s: %s:%d(%d)", log.info("Found responsive peer for %s: %s:%d(%d)",
sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port)
else: else:
log.info("Found dead peer for %s: %s:%d(%d)", log.info("Found dead peer for %s: %s:%d(%d)",
sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port)
await asyncio.sleep(.5) await asyncio.sleep(.5)
@property @property
@ -469,8 +469,10 @@ async def test():
await crawler.flush_to_db() await crawler.flush_to_db()
await crawler.node.start_listening() await crawler.node.start_listening()
if crawler.active_peers_count < 100: if crawler.active_peers_count < 100:
probes = []
for (host, port) in conf.known_dht_nodes: for (host, port) in conf.known_dht_nodes:
await crawler.crawl_routing_table(host, port) probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port)))
await asyncio.gather(*probes)
probe_task = asyncio.ensure_future(crawler.probe_files()) probe_task = asyncio.ensure_future(crawler.probe_files())
await crawler.process() await crawler.process()