From 2361e345412e537fbf2b4fda89f9bbac84e7616f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Jun 2022 17:16:20 -0300 Subject: [PATCH] dht crawler, initial version --- scripts/dht_crawler.py | 130 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 scripts/dht_crawler.py diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py new file mode 100644 index 000000000..8ab620385 --- /dev/null +++ b/scripts/dht_crawler.py @@ -0,0 +1,130 @@ +import logging +import asyncio +import time +import typing + +import lbry.dht.error +from lbry.dht.constants import generate_id +from lbry.dht.node import Node +from lbry.dht.peer import make_kademlia_peer, PeerManager +from lbry.dht.protocol.distance import Distance +from lbry.extras.daemon.storage import SQLiteStorage +from lbry.conf import Config +from lbry.utils import resolve_host + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") +log = logging.getLogger(__name__) + + +def new_node(address="0.0.0.0", udp_port=4444, node_id=None): + node_id = node_id or generate_id() + loop = asyncio.get_event_loop() + return Node(loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address) + + +class Crawler: + def __init__(self): + self.node = new_node() + self.crawled = set() + self.known_peers = set() + self.unreachable = set() + self.error = set() + self.semaphore = asyncio.Semaphore(10) + + async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: + async with self.semaphore: + peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port) + for attempt in range(3): + try: + response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] + except asyncio.TimeoutError: + log.info('Previously responding peer timed out: %s:%d attempt #%d', host, port, (attempt + 1)) + continue + except lbry.dht.error.RemoteException as e: + log.info('Previously responding peer errored: %s:%d attempt #%d - %s', + host, port, (attempt + 1), str(e)) + self.error.add((host, port)) + continue + return [] + + async def crawl_routing_table(self, host, port): + start = time.time() + log.info("querying %s:%d", host, port) + self.known_peers.add((host, port)) + self.crawled.add((host, port)) + address = await resolve_host(host, port, 'udp') + key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + if not key: + for _ in range(3): + try: + async with self.semaphore: + await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() + key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + except asyncio.TimeoutError: + pass + except lbry.dht.error.RemoteException: + self.error.add((host, port)) + if not key: + self.unreachable.add((host, port)) + return set() + node_id = key + distance = Distance(key) + max_distance = int.from_bytes(bytes([0xff] * 48), 'big') + peers = set() + factor = 2048 + for i in range(200): + #print(i, len(peers), key.hex(), host) + new_peers = await self.request_peers(address, port, key) + if not new_peers: + break + new_peers.sort(key=lambda peer: distance(peer.node_id)) + peers.update(new_peers) + far_key = new_peers[-1].node_id + if distance(far_key) <= distance(key): + current_distance = distance(key) + next_jump = current_distance + int(max_distance // factor) # jump closer + factor /= 2 + if factor > 8 and next_jump < max_distance: + key = int.from_bytes(node_id, 'big') ^ next_jump + if key.bit_length() > 384: + break + key = key.to_bytes(48, 'big') + else: + break + else: + key = far_key + factor = 2048 + log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", + host, port, (time.time() - start), len(peers), i) + self.crawled.update(peers) + return peers + + async def process(self): + to_process = {} + + def submit(_peer): + f = asyncio.ensure_future(self.crawl_routing_table(_peer.address, peer.udp_port)) + to_process[_peer] = f + f.add_done_callback(lambda _: to_process.pop(_peer)) + + while to_process or len(self.known_peers) < len(self.crawled): + log.info("%d known, %d unreachable, %d error.. %d processing", + len(self.known_peers), len(self.unreachable), len(self.error), len(to_process)) + for peer in self.crawled.difference(self.known_peers): + self.known_peers.add(peer) + submit(peer) + await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) + + +async def test(): + crawler = Crawler() + await crawler.node.start_listening() + conf = Config() + for (host, port) in conf.known_dht_nodes: + await crawler.crawl_routing_table(host, port) + await crawler.process() + +if __name__ == '__main__': + asyncio.run(test())