diff --git a/docker/Dockerfile.dht_node b/docker/Dockerfile.dht_node index d44370fc5..ce91ec961 100644 --- a/docker/Dockerfile.dht_node +++ b/docker/Dockerfile.dht_node @@ -2,6 +2,7 @@ FROM debian:10-slim ARG user=lbry ARG projects_dir=/home/$user +ARG db_dir=/database ARG DOCKER_TAG ARG DOCKER_COMMIT=docker @@ -27,12 +28,16 @@ RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user COPY . $projects_dir RUN chown -R $user:$user $projects_dir +RUN mkdir -p $db_dir +RUN chown -R $user:$user $db_dir USER $user WORKDIR $projects_dir +RUN python3 -m pip install -U setuptools pip RUN make install RUN python3 docker/set_build.py RUN rm ~/.cache -rf +VOLUME $db_dir ENTRYPOINT ["python3", "scripts/dht_node.py"] diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index c5a9c9e84..db4635447 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -100,6 +100,9 @@ class PeerManager: self._node_id_reverse_mapping[node_id] = (address, udp_port) self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys()) + def get_node_id_for_endpoint(self, address, port): + return self._node_id_mapping.get((address, port)) + def prune(self): # TODO: periodically call this now = self._loop.time() to_pop = [] @@ -150,9 +153,10 @@ class PeerManager: def peer_is_good(self, peer: 'KademliaPeer'): return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port) - def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use - node_id, address, tcp_port = decode_compact_address(compact_address) - return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) + +def decode_tcp_peer_from_compact_address(compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use + node_id, address, tcp_port = decode_compact_address(compact_address) + return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) @dataclass(unsafe_hash=True) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index d41b3b2a0..b9678ea1e 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING from lbry.dht import constants from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.distance import Distance -from lbry.dht.peer import make_kademlia_peer +from lbry.dht.peer import make_kademlia_peer, decode_tcp_peer_from_compact_address from lbry.dht.serialization.datagram import PAGE_KEY if TYPE_CHECKING: @@ -26,6 +26,15 @@ class FindResponse: def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]: raise NotImplementedError() + def get_close_kademlia_peers(self, peer_info) -> typing.Generator[typing.Iterator['KademliaPeer'], None, None]: + for contact_triple in self.get_close_triples(): + node_id, address, udp_port = contact_triple + try: + yield make_kademlia_peer(node_id, address, udp_port) + except ValueError: + log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer_info.address, + peer_info.udp_port, address, udp_port) + class FindNodeResponse(FindResponse): def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]): @@ -125,13 +134,8 @@ class IterativeFinder(AsyncIterator): async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) - for contact_triple in response.get_close_triples(): - node_id, address, udp_port = contact_triple - try: - self._add_active(make_kademlia_peer(node_id, address, udp_port)) - except ValueError: - log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, - peer.udp_port, address, udp_port) + for new_peer in response.get_close_kademlia_peers(peer): + self._add_active(new_peer) self.check_result_ready(response) self._log_state(reason="check result") @@ -319,7 +323,7 @@ class IterativeValueFinder(IterativeFinder): decoded_peers = set() for compact_addr in parsed.found_compact_addresses: try: - decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)) + decoded_peers.add(decode_tcp_peer_from_compact_address(compact_addr)) except ValueError: log.warning("misbehaving peer %s:%i returned invalid peer for blob", peer.address, peer.udp_port) @@ -341,7 +345,7 @@ class IterativeValueFinder(IterativeFinder): def check_result_ready(self, response: FindValueResponse): if response.found: - blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) + blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] to_yield = [] for blob_peer in blob_peers: diff --git a/lbry/extras/daemon/client.py b/lbry/extras/daemon/client.py index 7f0997320..9e9b1694a 100644 --- a/lbry/extras/daemon/client.py +++ b/lbry/extras/daemon/client.py @@ -1,5 +1,5 @@ -from lbry.conf import Config from lbry.extras.cli import execute_command +from lbry.conf import Config def daemon_rpc(conf: Config, method: str, **kwargs): diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py new file mode 100644 index 000000000..f39d9c1bf --- /dev/null +++ b/scripts/dht_crawler.py @@ -0,0 +1,520 @@ +import sys +import datetime +import logging +import asyncio +import os.path +import random +import time +import typing +from dataclasses import dataclass, astuple, replace + +from aiohttp import web +from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter, Histogram + +import lbry.dht.error +from lbry.dht.constants import generate_id +from lbry.dht.node import Node +from lbry.dht.peer import make_kademlia_peer, PeerManager, decode_tcp_peer_from_compact_address +from lbry.dht.protocol.distance import Distance +from lbry.dht.protocol.iterative_find import FindValueResponse, FindNodeResponse, FindResponse +from lbry.extras.daemon.storage import SQLiteMixin +from lbry.conf import Config +from lbry.utils import resolve_host + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") +log = logging.getLogger(__name__) + + +class SDHashSamples: + def __init__(self, samples_file_path): + with open(samples_file_path, "rb") as sample_file: + self._samples = sample_file.read() + assert len(self._samples) % 48 == 0 + self.size = len(self._samples) // 48 + + def read_samples(self, count=1): + for _ in range(count): + offset = 48 * random.randrange(0, self.size) + yield self._samples[offset:offset + 48] + + +class PeerStorage(SQLiteMixin): + CREATE_TABLES_QUERY = """ + PRAGMA JOURNAL_MODE=WAL; + CREATE TABLE IF NOT EXISTS peer ( + peer_id INTEGER NOT NULL, + node_id VARCHAR(96), + address VARCHAR, + udp_port INTEGER, + tcp_port INTEGER, + first_online DATETIME, + errors INTEGER, + last_churn INTEGER, + added_on DATETIME NOT NULL, + last_check DATETIME, + last_seen DATETIME, + latency INTEGER, + PRIMARY KEY (peer_id) + ); + CREATE TABLE IF NOT EXISTS connection ( + from_peer_id INTEGER NOT NULL, + to_peer_id INTEGER NOT NULL, + PRIMARY KEY (from_peer_id, to_peer_id), + FOREIGN KEY(from_peer_id) REFERENCES peer (peer_id), + FOREIGN KEY(to_peer_id) REFERENCES peer (peer_id) + ); +""" + + async def open(self): + await super().open() + self.db.writer_connection.row_factory = dict_row_factory + + async def all_peers(self): + return [ + DHTPeer(**peer) for peer in await self.db.execute_fetchall( + "select * from peer where latency > 0 or last_seen > datetime('now', '-1 hour')") + ] + + async def save_peers(self, *peers): + log.info("Saving graph nodes (peers) to DB") + await self.db.executemany( + "INSERT OR REPLACE INTO peer(" + "node_id, address, udp_port, tcp_port, first_online, errors, last_churn," + "added_on, last_check, last_seen, latency, peer_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", + [astuple(peer) for peer in peers] + ) + log.info("Finished saving graph nodes (peers) to DB") + + async def save_connections(self, connections_map): + log.info("Saving graph edges (connections) to DB") + await self.db.executemany( + "DELETE FROM connection WHERE from_peer_id = ?", [(key,) for key in connections_map]) + for from_peer_id in connections_map: + await self.db.executemany( + "INSERT INTO connection(from_peer_id, to_peer_id) VALUES(?,?)", + [(from_peer_id, to_peer_id) for to_peer_id in connections_map[from_peer_id]]) + log.info("Finished saving graph edges (connections) to DB") + + +@dataclass(frozen=True) +class DHTPeer: + node_id: str + address: str + udp_port: int + tcp_port: int = None + first_online: datetime.datetime = None + errors: int = None + last_churn: int = None + added_on: datetime.datetime = None + last_check: datetime.datetime = None + last_seen: datetime.datetime = None + latency: int = None + peer_id: int = None + + @classmethod + def from_kad_peer(cls, peer, peer_id): + node_id = peer.node_id.hex() if peer.node_id else None + return DHTPeer( + node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port, + peer_id=peer_id, added_on=datetime.datetime.utcnow()) + + def to_kad_peer(self): + node_id = bytes.fromhex(self.node_id) if self.node_id else None + return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port) + + +def new_node(address="0.0.0.0", udp_port=0, node_id=None): + node_id = node_id or generate_id() + loop = asyncio.get_event_loop() + return Node(loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address) + + +class Crawler: + unique_total_hosts_metric = Gauge( + "unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node", + ) + reachable_hosts_metric = Gauge( + "reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node", + ) + total_historic_hosts_metric = Gauge( + "history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node", + ) + pending_check_hosts_metric = Gauge( + "pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node", + ) + hosts_with_errors_metric = Gauge( + "error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node", + ) + ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS = tuple(map(float, range(100))) + ( + 500., 1000., 2000., float('inf') + ) + connections_found_metric = Histogram( + "connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node", + buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS + ) + known_connections_found_metric = Histogram( + "known_connections_found", "Number of already known hosts returned by last contact.", + namespace="dht_crawler_node", buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS + ) + reachable_connections_found_metric = Histogram( + "reachable_connections_found", "Number of reachable known hosts returned by last contact.", + namespace="dht_crawler_node", buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS + ) + LATENCY_HISTOGRAM_BUCKETS = ( + 0., 5., 10., 15., 30., 60., 120., 180., 240., 300., 600., 1200., 1800., 4000., 6000., float('inf') + ) + host_latency_metric = Histogram( + "host_latency", "Time spent on the last request, in milliseconds.", namespace="dht_crawler_node", + buckets=LATENCY_HISTOGRAM_BUCKETS + ) + probed_streams_metric = Counter( + "probed_streams", "Amount of streams probed.", namespace="dht_crawler_node", + ) + announced_streams_metric = Counter( + "announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node", + ) + working_streams_metric = Counter( + "working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node", + ) + + def __init__(self, db_path: str, sd_hash_samples: SDHashSamples): + self.node = new_node() + self.db = PeerStorage(db_path) + self.sd_hashes = sd_hash_samples + self._memory_peers = {} + self._reachable_by_node_id = {} + self._connections = {} + + async def open(self): + await self.db.open() + self._memory_peers = { + (peer.address, peer.udp_port): peer for peer in await self.db.all_peers() + } + self.refresh_reachable_set() + + def refresh_reachable_set(self): + self._reachable_by_node_id = { + bytes.fromhex(peer.node_id): peer for peer in self._memory_peers.values() if (peer.latency or 0) > 0 + } + + async def probe_files(self): + if not self.sd_hashes: + return + while True: + for sd_hash in self.sd_hashes.read_samples(10_000): + self.refresh_reachable_set() + distance = Distance(sd_hash) + node_ids = list(self._reachable_by_node_id.keys()) + node_ids.sort(key=lambda node_id: distance(node_id)) + k_closest = [self._reachable_by_node_id[node_id] for node_id in node_ids[:8]] + found = False + working = False + for response in asyncio.as_completed( + [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): + response = await response + if response and response.found: + found = True + blob_peers = [] + for compact_addr in response.found_compact_addresses: + try: + blob_peers.append(decode_tcp_peer_from_compact_address(compact_addr)) + except ValueError as e: + log.error("Error decoding compact peers: %s", e) + for blob_peer in blob_peers: + response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) + if response: + working = True + log.info("Found responsive peer for %s: %s:%d(%d)", + sd_hash.hex()[:8], blob_peer.address, + blob_peer.udp_port or -1, blob_peer.tcp_port or -1) + else: + log.info("Found dead peer for %s: %s:%d(%d)", + sd_hash.hex()[:8], blob_peer.address, + blob_peer.udp_port or -1, blob_peer.tcp_port or -1) + self.probed_streams_metric.inc() + if found: + self.announced_streams_metric.inc() + if working: + self.working_streams_metric.inc() + log.info("Done querying stream %s for peers. Found: %s, working: %s", sd_hash.hex()[:8], found, working) + await asyncio.sleep(.5) + + @property + def refresh_limit(self): + return datetime.datetime.utcnow() - datetime.timedelta(hours=1) + + @property + def all_peers(self): + return [ + peer for peer in self._memory_peers.values() + if (peer.last_seen and peer.last_seen > self.refresh_limit) or (peer.latency or 0) > 0 + ] + + @property + def active_peers_count(self): + return len(self.all_peers) + + @property + def checked_peers_count(self): + return len([peer for peer in self.all_peers if peer.last_check and peer.last_check > self.refresh_limit]) + + @property + def unreachable_peers_count(self): + return len([peer for peer in self.all_peers + if peer.last_check and peer.last_check > self.refresh_limit and not peer.latency]) + + @property + def peers_with_errors_count(self): + return len([peer for peer in self.all_peers if (peer.errors or 0) > 0]) + + def get_peers_needing_check(self): + to_check = [peer for peer in self.all_peers if peer.last_check is None or peer.last_check < self.refresh_limit] + return to_check + + def remove_expired_peers(self): + for key, peer in list(self._memory_peers.items()): + if (peer.latency or 0) < 1 and peer.last_seen < self.refresh_limit: + del self._memory_peers[key] + + def add_peers(self, *peers): + for peer in peers: + db_peer = self.get_from_peer(peer) + if db_peer and db_peer.node_id is None and peer.node_id is not None: + db_peer = replace(db_peer, node_id=peer.node_id.hex()) + elif not db_peer: + db_peer = DHTPeer.from_kad_peer(peer, len(self._memory_peers) + 1) + db_peer = replace(db_peer, last_seen=datetime.datetime.utcnow()) + self._memory_peers[(peer.address, peer.udp_port)] = db_peer + + async def flush_to_db(self): + await self.db.save_peers(*self._memory_peers.values()) + connections_to_save = self._connections + self._connections = {} + # await self.db.save_connections(connections_to_save) heavy call + self.remove_expired_peers() + + def get_from_peer(self, peer): + return self._memory_peers.get((peer.address, peer.udp_port), None) + + def set_latency(self, peer, latency=None): + if latency: + self.host_latency_metric.observe(latency / 1_000_000.0) + db_peer = self.get_from_peer(peer) + if not db_peer: + return + db_peer = replace(db_peer, latency=latency) + if not db_peer.node_id and peer.node_id: + db_peer = replace(db_peer, node_id=peer.node_id.hex()) + if db_peer.first_online and latency is None: + db_peer = replace(db_peer, last_churn=(datetime.datetime.utcnow() - db_peer.first_online).seconds) + elif latency is not None and db_peer.first_online is None: + db_peer = replace(db_peer, first_online=datetime.datetime.utcnow()) + db_peer = replace(db_peer, last_check=datetime.datetime.utcnow()) + self._memory_peers[(db_peer.address, db_peer.udp_port)] = db_peer + + def inc_errors(self, peer): + db_peer = self.get_from_peer(peer) + self._memory_peers[(peer.address, peer.node_id)] = replace(db_peer, errors=(db_peer.errors or 0) + 1) + + def associate_peers(self, peer, other_peers): + self._connections[self.get_from_peer(peer).peer_id] = [ + self.get_from_peer(other_peer).peer_id for other_peer in other_peers] + + async def request_peers(self, host, port, node_id, key=None) -> typing.Optional[FindResponse]: + key = key or node_id + peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port) + for attempt in range(3): + try: + req_start = time.perf_counter_ns() + if key == node_id: + response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + response = FindNodeResponse(key, response) + latency = time.perf_counter_ns() - req_start + self.set_latency(peer, latency) + else: + response = await self.node.protocol.get_rpc_peer(peer).find_value(key) + response = FindValueResponse(key, response) + await asyncio.sleep(0.05) + return response + except asyncio.TimeoutError: + if key == node_id: + self.set_latency(peer, None) + continue + except lbry.dht.error.TransportNotConnected: + log.info("Transport unavailable, waiting 1s to retry") + await asyncio.sleep(1) + except lbry.dht.error.RemoteException as e: + log.info('Peer errored: %s:%d attempt #%d - %s', + host, port, (attempt + 1), str(e)) + if key == node_id: + self.inc_errors(peer) + self.set_latency(peer, None) + continue + + async def crawl_routing_table(self, host, port, node_id=None): + start = time.time() + log.debug("querying %s:%d", host, port) + address = await resolve_host(host, port, 'udp') + key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + peer = make_kademlia_peer(key, address, port) + self.add_peers(peer) + if not key: + latency = None + for _ in range(3): + try: + ping_start = time.perf_counter_ns() + await self.node.protocol.get_rpc_peer(peer).ping() + await asyncio.sleep(0.05) + key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + peer = make_kademlia_peer(key, address, port) + latency = time.perf_counter_ns() - ping_start + break + except asyncio.TimeoutError: + pass + except lbry.dht.error.RemoteException: + self.inc_errors(peer) + pass + self.set_latency(peer, latency if peer.node_id else None) + if not latency or not peer.node_id: + if latency and not peer.node_id: + log.warning("No node id from %s:%d", host, port) + return set() + distance = Distance(key) + max_distance = int.from_bytes(bytes([0xff] * 48), 'big') + peers = set() + factor = 2048 + for i in range(1000): + response = await self.request_peers(address, port, key) + new_peers = list(response.get_close_kademlia_peers(peer)) if response else None + if not new_peers: + break + new_peers.sort(key=lambda peer: distance(peer.node_id)) + peers.update(new_peers) + far_key = new_peers[-1].node_id + if distance(far_key) <= distance(key): + current_distance = distance(key) + next_jump = current_distance + int(max_distance // factor) # jump closer + factor /= 2 + if factor > 8 and next_jump < max_distance: + key = int.from_bytes(peer.node_id, 'big') ^ next_jump + if key.bit_length() > 384: + break + key = key.to_bytes(48, 'big') + else: + break + else: + key = far_key + factor = 2048 + if peers: + log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", + host, port, (time.time() - start), len(peers), i) + if peers: + self.connections_found_metric.observe(len(peers)) + known_peers = 0 + reachable_connections = 0 + for peer in peers: + known_peer = self.get_from_peer(peer) + known_peers += 1 if known_peer else 0 + reachable_connections += 1 if known_peer and (known_peer.latency or 0) > 0 else 0 + self.known_connections_found_metric.observe(known_peers) + self.reachable_connections_found_metric.observe(reachable_connections) + self.add_peers(*peers) + self.associate_peers(peer, peers) + return peers + + async def process(self): + to_process = {} + + def submit(_peer): + f = asyncio.ensure_future( + self.crawl_routing_table(_peer.address, _peer.udp_port, bytes.fromhex(_peer.node_id))) + to_process[_peer.peer_id] = f + f.add_done_callback(lambda _: to_process.pop(_peer.peer_id)) + + to_check = self.get_peers_needing_check() + last_flush = datetime.datetime.utcnow() + while True: + for peer in to_check[:200]: + if peer.peer_id not in to_process: + submit(peer) + await asyncio.sleep(.05) + await asyncio.sleep(0) + self.unique_total_hosts_metric.set(self.checked_peers_count) + self.reachable_hosts_metric.set(self.checked_peers_count - self.unreachable_peers_count) + self.total_historic_hosts_metric.set(len(self._memory_peers)) + self.pending_check_hosts_metric.set(len(to_check)) + self.hosts_with_errors_metric.set(self.peers_with_errors_count) + log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", + self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count, + self.peers_with_errors_count, len(to_process), len(to_check)) + if to_process: + await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) + to_check = self.get_peers_needing_check() + if (datetime.datetime.utcnow() - last_flush).seconds > 60: + log.info("flushing to db") + await self.flush_to_db() + last_flush = datetime.datetime.utcnow() + while not to_check and not to_process: + port = self.node.listening_port.get_extra_info('socket').getsockname()[1] + self.node.stop() + await self.node.start_listening() + log.info("Idle, sleeping a minute. Port changed to %d", port) + await asyncio.sleep(60.0) + to_check = self.get_peers_needing_check() + + +class SimpleMetrics: + def __init__(self, port): + self.prometheus_port = port + + async def handle_metrics_get_request(self, _): + try: + return web.Response( + text=prom_generate_latest().decode(), + content_type='text/plain; version=0.0.4' + ) + except Exception: + log.exception('could not generate prometheus data') + raise + + async def start(self): + prom_app = web.Application() + prom_app.router.add_get('/metrics', self.handle_metrics_get_request) + metrics_runner = web.AppRunner(prom_app) + await metrics_runner.setup() + prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) + await prom_site.start() + + +def dict_row_factory(cursor, row): + d = {} + for idx, col in enumerate(cursor.description): + if col[0] in ('added_on', 'first_online', 'last_seen', 'last_check'): + d[col[0]] = datetime.datetime.fromisoformat(row[idx]) if row[idx] else None + else: + d[col[0]] = row[idx] + return d + + +async def test(): + db_path = "/tmp/peers.db" if len(sys.argv) == 1 else sys.argv[-1] + asyncio.get_event_loop().set_debug(True) + metrics = SimpleMetrics('8080') + await metrics.start() + conf = Config() + hosting_samples = SDHashSamples("test.sample") if os.path.isfile("test.sample") else None + crawler = Crawler(db_path, hosting_samples) + await crawler.open() + await crawler.flush_to_db() + await crawler.node.start_listening() + if crawler.active_peers_count < 100: + probes = [] + for (host, port) in conf.known_dht_nodes: + probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port))) + await asyncio.gather(*probes) + await crawler.flush_to_db() + await asyncio.gather(crawler.process(), crawler.probe_files()) + +if __name__ == '__main__': + asyncio.run(test()) diff --git a/scripts/sd_hash_sampler.py b/scripts/sd_hash_sampler.py new file mode 100644 index 000000000..f651cd1be --- /dev/null +++ b/scripts/sd_hash_sampler.py @@ -0,0 +1,44 @@ +import asyncio +from typing import Iterable + +from lbry.extras.daemon.client import daemon_rpc +from lbry.conf import Config +conf = Config() + + +async def sample_prefix(prefix: bytes): + result = await daemon_rpc(conf, "claim_search", sd_hash=prefix.hex(), page_size=50) + total_pages = result['total_pages'] + print(total_pages) + sd_hashes = set() + for page in range(1, total_pages + 1): + if page > 1: + result = await daemon_rpc(conf, "claim_search", sd_hash=prefix.hex(), page=page, page_size=50) + for item in result['items']: + sd_hash = item.get('value', {}).get('source', {}).get('sd_hash') + if not sd_hash: + print('err', item) + continue + sd_hashes.add(sd_hash) + print('page', page, len(sd_hashes)) + return sd_hashes + + +def save_sample(name: str, samples: Iterable[str]): + with open(name, 'wb') as outfile: + for sample in samples: + outfile.write(bytes.fromhex(sample)) + outfile.flush() + print(outfile.tell()) + + +async def main(): + samples = set() + futs = [asyncio.ensure_future(sample_prefix(bytes([i]))) for i in range(256)] + for i, completed in enumerate(asyncio.as_completed(futs)): + samples.update(await completed) + print(i, len(samples)) + print(save_sample("test.sample", samples)) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file