From 13af7800c2c5c20633744b165699cb1eb0b20015 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Jul 2022 01:35:44 -0300 Subject: [PATCH] refactor script, remove dep --- docker/Dockerfile.dht_node | 2 +- scripts/dht_crawler.py | 255 ++++++++++++++++++++++--------------- 2 files changed, 154 insertions(+), 103 deletions(-) diff --git a/docker/Dockerfile.dht_node b/docker/Dockerfile.dht_node index 0b7932a84..9399911d9 100644 --- a/docker/Dockerfile.dht_node +++ b/docker/Dockerfile.dht_node @@ -31,7 +31,7 @@ RUN chown -R $user:$user $projects_dir USER $user WORKDIR $projects_dir -RUN python3 -m pip install -U setuptools pip sqlalchemy +RUN python3 -m pip install -U setuptools pip RUN make install RUN python3 docker/set_build.py RUN rm ~/.cache -rf diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index bc8cee37e..7a76e1639 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -3,9 +3,10 @@ import logging import asyncio import time import typing +from dataclasses import dataclass, astuple, replace from aiohttp import web -from prometheus_client import Gauge, Counter, generate_latest as prom_generate_latest +from prometheus_client import Gauge, generate_latest as prom_generate_latest import lbry.dht.error from lbry.dht.constants import generate_id @@ -17,56 +18,92 @@ from lbry.conf import Config from lbry.utils import resolve_host -from sqlalchemy.orm import declarative_base, relationship -import sqlalchemy as sqla - - -@sqla.event.listens_for(sqla.engine.Engine, "connect") -def set_sqlite_pragma(dbapi_connection, _): - cursor = dbapi_connection.cursor() - cursor.execute("PRAGMA journal_mode=WAL") - cursor.close() - - logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) -Base = declarative_base() -class DHTPeer(Base): - __tablename__ = "peer" - peer_id = sqla.Column(sqla.Integer(), sqla.Identity(), primary_key=True) - node_id = sqla.Column(sqla.String(96)) - address = sqla.Column(sqla.String()) - udp_port = sqla.Column(sqla.Integer()) - tcp_port = sqla.Column(sqla.Integer()) - first_online = sqla.Column(sqla.DateTime()) - errors = sqla.Column(sqla.Integer(), default=0) - last_churn = sqla.Column(sqla.Integer()) - added_on = sqla.Column(sqla.DateTime(), nullable=False, default=datetime.datetime.utcnow) - last_check = sqla.Column(sqla.DateTime()) - last_seen = sqla.Column(sqla.DateTime()) - latency = sqla.Column(sqla.Integer()) - endpoint_unique = sqla.UniqueConstraint("node_id", "udp_port") +class PeerStorage(SQLiteMixin): + CREATE_TABLES_QUERY = """ + PRAGMA JOURNAL_MODE=WAL; + CREATE TABLE IF NOT EXISTS peer ( + peer_id INTEGER NOT NULL, + node_id VARCHAR(96), + address VARCHAR, + udp_port INTEGER, + tcp_port INTEGER, + first_online DATETIME, + errors INTEGER, + last_churn INTEGER, + added_on DATETIME NOT NULL, + last_check DATETIME, + last_seen DATETIME, + latency INTEGER, + PRIMARY KEY (peer_id) + ); + CREATE TABLE IF NOT EXISTS connection ( + from_peer_id INTEGER NOT NULL, + to_peer_id INTEGER NOT NULL, + PRIMARY KEY (from_peer_id, to_peer_id), + FOREIGN KEY(from_peer_id) REFERENCES peer (peer_id), + FOREIGN KEY(to_peer_id) REFERENCES peer (peer_id) + ); +""" + + async def open(self): + await super().open() + self.db.writer_connection.row_factory = dict_row_factory + + async def all_peers(self): + return [DHTPeer(**peer) for peer in await self.db.execute_fetchall("select * from peer")] + + async def save_peers(self, *peers): + log.info("Saving graph nodes (peers) to DB") + await self.db.executemany( + "INSERT OR REPLACE INTO peer(" + "node_id, address, udp_port, tcp_port, first_online, errors, last_churn," + "added_on, last_check, last_seen, latency, peer_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", + [astuple(peer) for peer in peers] + ) + log.info("Finished saving graph nodes (peers) to DB") + + async def save_connections(self, connections_map): + log.info("Saving graph edges (connections) to DB") + await self.db.executemany( + "DELETE FROM connection WHERE from_peer_id = ?", [(key,) for key in connections_map]) + for from_peer_id in connections_map: + await self.db.executemany( + "INSERT INTO connection(from_peer_id, to_peer_id) VALUES(?,?)", + [(from_peer_id, to_peer_id) for to_peer_id in connections_map[from_peer_id]]) + log.info("Finished saving graph edges (connections) to DB") + + +@dataclass(frozen=True) +class DHTPeer: + node_id: str + address: str + udp_port: int + tcp_port: int = None + first_online: datetime.datetime = None + errors: int = None + last_churn: int = None + added_on: datetime.datetime = None + last_check: datetime.datetime = None + last_seen: datetime.datetime = None + latency: int = None + peer_id: int = None @classmethod - def from_kad_peer(cls, peer): + def from_kad_peer(cls, peer, peer_id): node_id = peer.node_id.hex() if peer.node_id else None - return DHTPeer(node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) + return DHTPeer( + node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port, + peer_id=peer_id, added_on=datetime.datetime.utcnow()) def to_kad_peer(self): node_id = bytes.fromhex(self.node_id) if self.node_id else None return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port) -class DHTConnection(Base): - __tablename__ = "connection" - from_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) - connected_by = relationship("DHTPeer", backref="known_by", primaryjoin=(DHTPeer.peer_id == from_peer_id)) - to_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) - connected_to = relationship("DHTPeer", backref="connections", primaryjoin=(DHTPeer.peer_id == to_peer_id)) - - def new_node(address="0.0.0.0", udp_port=0, node_id=None): node_id = node_id or generate_id() loop = asyncio.get_event_loop() @@ -102,15 +139,17 @@ class Crawler: "host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node", labelnames=("host", "port") ) + def __init__(self, db_path: str): self.node = new_node() - self.semaphore = asyncio.Semaphore(200) - engine = sqla.create_engine(f"sqlite:///{db_path}") - Base.metadata.create_all(engine) - session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False) - self.db = session() + self.db = PeerStorage(db_path) + self._memory_peers = {} + self._connections = {} + + async def open(self): + await self.db.open() self._memory_peers = { - (peer.address, peer.udp_port): peer for peer in self.db.query(DHTPeer).all() + (peer.address, peer.udp_port): peer for peer in await self.db.all_peers() } @property @@ -146,20 +185,20 @@ class Crawler: return to_check def add_peers(self, *peers): - db_peers = [] for peer in peers: db_peer = self.get_from_peer(peer) - if db_peer and db_peer.node_id is None and peer.node_id: - db_peer.node_id = peer.node_id.hex() + if db_peer and db_peer.node_id is None and peer.node_id is not None: + db_peer = replace(db_peer, node_id=peer.node_id.hex()) elif not db_peer: - db_peer = DHTPeer.from_kad_peer(peer) - self._memory_peers[(peer.address, peer.udp_port)] = db_peer - db_peer.last_seen = datetime.datetime.utcnow() - db_peers.append(db_peer) + db_peer = DHTPeer.from_kad_peer(peer, len(self._memory_peers) + 1) + db_peer = replace(db_peer, last_seen=datetime.datetime.utcnow()) + self._memory_peers[(peer.address, peer.udp_port)] = db_peer - def flush_to_db(self): - self.db.add_all(self._memory_peers.values()) - self.db.commit() + async def flush_to_db(self): + await self.db.save_peers(*self._memory_peers.values()) + connections_to_save = self._connections + self._connections = {} + await self.db.save_connections(connections_to_save) def get_from_peer(self, peer): return self._memory_peers.get((peer.address, peer.udp_port), None) @@ -167,71 +206,73 @@ class Crawler: def set_latency(self, peer, latency=None): if latency: self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency) - db_peer = self.get_from_peer(peer) - db_peer.latency = latency + db_peer = replace(self.get_from_peer(peer), latency=latency) if not db_peer.node_id and peer.node_id: - db_peer.node_id = peer.node_id.hex() + db_peer = replace(db_peer, node_id=peer.node_id.hex()) if db_peer.first_online and latency is None: - db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds + db_peer = replace(db_peer, last_churn=(datetime.datetime.utcnow() - db_peer.first_online).seconds) elif latency is not None and db_peer.first_online is None: - db_peer.first_online = datetime.datetime.utcnow() - db_peer.last_check = datetime.datetime.utcnow() + db_peer = replace(db_peer, first_online=datetime.datetime.utcnow()) + db_peer = replace(db_peer, last_check=datetime.datetime.utcnow()) + self._memory_peers[(db_peer.address, db_peer.udp_port)] = db_peer def inc_errors(self, peer): db_peer = self.get_from_peer(peer) - db_peer.errors = (db_peer.errors or 0) + 1 + self._memory_peers[(peer.address, peer.node_id)] = replace(db_peer, errors=(db_peer.errors or 0) + 1) - def associate_peers(self, target_peer_id, db_peer_ids): - return # todo + def associate_peers(self, peer, other_peers): + self._connections[self.get_from_peer(peer).peer_id] = [ + self.get_from_peer(other_peer).peer_id for other_peer in other_peers] async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: - async with self.semaphore: - peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port) - for attempt in range(3): - try: - req_start = time.perf_counter_ns() - response = await self.node.protocol.get_rpc_peer(peer).find_node(key) - latency = time.perf_counter_ns() - req_start - self.set_latency(make_kademlia_peer(key, host, port), latency) - return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] - except asyncio.TimeoutError: - self.set_latency(make_kademlia_peer(key, host, port), None) - continue - except lbry.dht.error.RemoteException as e: - log.info('Peer errored: %s:%d attempt #%d - %s', - host, port, (attempt + 1), str(e)) - self.inc_errors(peer) - self.set_latency(make_kademlia_peer(key, host, port), None) - continue + peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port) + for attempt in range(3): + try: + req_start = time.perf_counter_ns() + response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + await asyncio.sleep(0.05) + latency = time.perf_counter_ns() - req_start + self.set_latency(peer, latency) + return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] + except asyncio.TimeoutError: + self.set_latency(peer, None) + continue + except lbry.dht.error.RemoteException as e: + log.info('Peer errored: %s:%d attempt #%d - %s', + host, port, (attempt + 1), str(e)) + self.inc_errors(peer) + self.set_latency(peer, None) + continue return [] async def crawl_routing_table(self, host, port, node_id=None): start = time.time() log.info("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') - self.add_peers(make_kademlia_peer(None, address, port)) key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + peer = make_kademlia_peer(key, address, port) + self.add_peers(peer) if not key: latency = None for _ in range(3): try: ping_start = time.perf_counter_ns() - async with self.semaphore: - await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() - key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + await self.node.protocol.get_rpc_peer(peer).ping() + await asyncio.sleep(0.05) + key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + peer = make_kademlia_peer(key, address, port) latency = time.perf_counter_ns() - ping_start break except asyncio.TimeoutError: pass except lbry.dht.error.RemoteException: - self.inc_errors(make_kademlia_peer(None, address, port)) + self.inc_errors(peer) pass - self.set_latency(make_kademlia_peer(key, address, port), latency if key else None) - if not latency or not key: - if latency and not key: + self.set_latency(peer, latency if peer.node_id else None) + if not latency or not peer.node_id: + if latency and not peer.node_id: log.warning("No node id from %s:%d", host, port) return set() - node_id = key distance = Distance(key) max_distance = int.from_bytes(bytes([0xff] * 48), 'big') peers = set() @@ -248,7 +289,7 @@ class Crawler: next_jump = current_distance + int(max_distance // factor) # jump closer factor /= 2 if factor > 8 and next_jump < max_distance: - key = int.from_bytes(node_id, 'big') ^ next_jump + key = int.from_bytes(peer.node_id, 'big') ^ next_jump if key.bit_length() > 384: break key = key.to_bytes(48, 'big') @@ -262,8 +303,7 @@ class Crawler: self.add_peers(*peers) if peers: self.connections_found_metric.labels(host=host, port=port).set(len(peers)) - #self.associate_peers(this_peer_id, db_peer_ids) - self.db.commit() + self.associate_peers(peer, peers) return peers async def process(self): @@ -271,19 +311,17 @@ class Crawler: def submit(_peer): f = asyncio.ensure_future( - self.crawl_routing_table(_peer.address, peer.udp_port, bytes.fromhex(peer.node_id))) - to_process[_peer] = f - f.add_done_callback(lambda _: to_process.pop(_peer)) + self.crawl_routing_table(_peer.address, _peer.udp_port, bytes.fromhex(_peer.node_id))) + to_process[_peer.peer_id] = f + f.add_done_callback(lambda _: to_process.pop(_peer.peer_id)) to_check = self.get_peers_needing_check() last_flush = datetime.datetime.utcnow() while True: - for peer in to_check: - if peer not in to_process: + for peer in to_check[:200]: + if peer.peer_id not in to_process: submit(peer) - await asyncio.sleep(.1) - if len(to_process) > 100: - break + await asyncio.sleep(.05) await asyncio.sleep(0) self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count) self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count) @@ -298,7 +336,7 @@ class Crawler: to_check = self.get_peers_needing_check() if (datetime.datetime.utcnow() - last_flush).seconds > 60: log.info("flushing to db") - self.flush_to_db() + await self.flush_to_db() last_flush = datetime.datetime.utcnow() while not to_check and not to_process: port = self.node.listening_port.get_extra_info('socket').getsockname()[1] @@ -332,10 +370,23 @@ class SimpleMetrics: await prom_site.start() +def dict_row_factory(cursor, row): + d = {} + for idx, col in enumerate(cursor.description): + if col[0] in ('added_on', 'first_online', 'last_seen', 'last_check'): + d[col[0]] = datetime.datetime.fromisoformat(row[idx]) if row[idx] else None + else: + d[col[0]] = row[idx] + return d + + async def test(): + asyncio.get_event_loop().set_debug(True) metrics = SimpleMetrics('8080') await metrics.start() crawler = Crawler("/tmp/a.db") + await crawler.open() + await crawler.flush_to_db() await crawler.node.start_listening() conf = Config() if crawler.active_peers_count < 100: