From 61f7fbe23036bcdb4a8cba6df2986b30aa3b84b1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 19:58:26 -0300 Subject: [PATCH] dht_crawler: avoid reads --- scripts/dht_crawler.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 57914ac38..44b50466e 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -106,13 +106,17 @@ class Crawler: sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()]) def add_peers(self, *peers): + db_peers = [] for peer in peers: db_peer = self.get_from_peer(peer) if db_peer and db_peer.node_id is None and peer.node_id: db_peer.node_id = peer.node_id - self.db.add(db_peer) elif not db_peer: - self.db.add(DHTPeer.from_kad_peer(peer)) + db_peer = DHTPeer.from_kad_peer(peer) + self.db.add(db_peer) + db_peers.append(db_peer) + self.db.flush() + return [dbp.peer_id for dbp in db_peers] def get_from_peer(self, peer): return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first() @@ -137,18 +141,16 @@ class Crawler: def count_peers(self): return self.db.query(DHTPeer).count() - def associate_peers(self, target_peer, peers): - db_peer = self.get_from_peer(target_peer) + def associate_peers(self, target_peer_id, db_peer_ids): connections = { DHTConnection( - from_peer_id=db_peer.peer_id, - to_peer_id=self.get_from_peer(peer).peer_id) - for peer in peers + from_peer_id=target_peer_id, + to_peer_id=peer_id) + for peer_id in db_peer_ids } - all_peer_ids = {peer.node_id for peer in peers if peer.node_id} - self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update( + self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(set(db_peer_ids))).update( {DHTPeer.last_seen: datetime.datetime.utcnow()}) - self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete() + self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == target_peer_id).delete() self.db.add_all(connections) async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: @@ -172,7 +174,7 @@ class Crawler: start = time.time() log.info("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') - self.add_peers(make_kademlia_peer(None, address, port)) + this_peer_id, = self.add_peers(make_kademlia_peer(None, address, port)) key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) latency = None for _ in range(3): @@ -221,9 +223,8 @@ class Crawler: factor = 2048 log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) - self.add_peers(*peers) - self.associate_peers(make_kademlia_peer(key, address, port), peers) - self.db.flush() + db_peer_ids = self.add_peers(*peers) + self.associate_peers(this_peer_id, db_peer_ids) self.db.commit() return peers