From df77392fe0067983b3b034aafc77e84a0041cb60 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 23 Jul 2022 14:58:02 -0300 Subject: [PATCH] dht crawler:improve logging, metrics, make startup concurrent --- scripts/dht_crawler.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 64261f007..3345a6852 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -158,15 +158,15 @@ class Crawler: ) probed_streams_metric = Counter( "probed_streams", "Amount of streams probed.", namespace="dht_crawler_node", - labelnames=("sd_hash",) + labelnames=("scope",) ) announced_streams_metric = Counter( "announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node", - labelnames=("sd_hash",) + labelnames=("scope",) ) working_streams_metric = Counter( "working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node", - labelnames=("sd_hash",) + labelnames=("scope",) ) def __init__(self, db_path: str, sd_hash_samples: SDHashSamples): @@ -195,7 +195,7 @@ class Crawler: while True: for sd_hash in self.sd_hashes.read_samples(10_000): self.refresh_reachable_set() - log.info("Querying stream %s for peers.", sd_hash[:8]) + log.info("Querying stream %s for peers.", sd_hash.hex()[:8]) distance = Distance(sd_hash) node_ids = list(self._reachable_by_node_id.keys()) node_ids.sort(key=lambda node_id: distance(node_id)) @@ -203,20 +203,20 @@ class Crawler: for response in asyncio.as_completed( [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): response = await response - self.probed_streams_metric.labels(sd_hash).inc() + self.probed_streams_metric.labels("global").inc() if response and response.found: - self.announced_streams_metric.labels(sd_hash).inc() + self.announced_streams_metric.labels("global").inc() blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] for blob_peer in blob_peers: response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) if response: - self.working_streams_metric.labels(sd_hash).inc() + self.working_streams_metric.labels("global").inc() log.info("Found responsive peer for %s: %s:%d(%d)", - sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) + sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) else: log.info("Found dead peer for %s: %s:%d(%d)", - sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) + sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) await asyncio.sleep(.5) @property @@ -469,8 +469,10 @@ async def test(): await crawler.flush_to_db() await crawler.node.start_listening() if crawler.active_peers_count < 100: + probes = [] for (host, port) in conf.known_dht_nodes: - await crawler.crawl_routing_table(host, port) + probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port))) + await asyncio.gather(*probes) probe_task = asyncio.ensure_future(crawler.probe_files()) await crawler.process()