dht_crawler: keep working set in memory, flush to db on intervals

This commit is contained in:
Victor Shyba 2022-06-15 11:28:17 -03:00 committed by Victor Shyba
parent cd42f0d726
commit 508bdb8e94

View file

@ -52,7 +52,7 @@ class DHTPeer(Base):
return DHTPeer(node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port)
def to_kad_peer(self):
node_id = bytes.fromhex(self.node_id.hex()) if self.node_id else None
node_id = bytes.fromhex(self.node_id) if self.node_id else None
return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port)
@ -73,40 +73,46 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None):
class Crawler:
def __init__(self, db_path: str):
self.node = new_node()
self.semaphore = asyncio.Semaphore(20)
self.semaphore = asyncio.Semaphore(200)
engine = sqla.create_engine(f"sqlite:///{db_path}")
Base.metadata.create_all(engine)
session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False)
self.db = session()
self._memory_peers = {
(peer.address, peer.udp_port): peer for peer in self.db.query(DHTPeer).all()
}
@property
def refresh_limit(self):
return datetime.datetime.utcnow() - datetime.timedelta(hours=1)
@property
def active_peers_query(self):
return self.db.query(DHTPeer).filter(sqla.or_(DHTPeer.last_seen > self.refresh_limit, DHTPeer.latency > 0))
def all_peers(self):
return [
peer for peer in self._memory_peers.values()
if (peer.last_seen and peer.last_seen > self.refresh_limit) or (peer.latency or 0) > 0
]
@property
def all_peers(self):
return set([peer.to_kad_peer() for peer in self.active_peers_query.all()])
def active_peers_count(self):
return len(self.all_peers)
@property
def checked_peers_count(self):
return self.active_peers_query.filter(DHTPeer.last_check > self.refresh_limit).count()
return len([peer for peer in self.all_peers if peer.last_check and peer.last_check > self.refresh_limit])
@property
def unreachable_peers_count(self):
return self.active_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > self.refresh_limit).count()
return len([peer for peer in self.all_peers
if peer.last_check and peer.last_check > self.refresh_limit and not peer.latency])
@property
def peers_with_errors_count(self):
return self.active_peers_query.filter(DHTPeer.errors > 0).count()
return len([peer for peer in self.all_peers if (peer.errors or 0) > 0])
def get_peers_needing_check(self):
return set([peer.to_kad_peer() for peer in self.active_peers_query.filter(
sqla.or_(DHTPeer.last_check == None,
DHTPeer.last_check < self.refresh_limit)).order_by(DHTPeer.last_seen.desc()).all()])
to_check = [peer for peer in self.all_peers if peer.last_check is None or peer.last_check < self.refresh_limit]
return to_check
def add_peers(self, *peers):
db_peers = []
@ -116,13 +122,16 @@ class Crawler:
db_peer.node_id = peer.node_id.hex()
elif not db_peer:
db_peer = DHTPeer.from_kad_peer(peer)
self.db.add(db_peer)
self._memory_peers[(peer.address, peer.udp_port)] = db_peer
db_peer.last_seen = datetime.datetime.utcnow()
db_peers.append(db_peer)
self.db.flush()
return [dbp.peer_id for dbp in db_peers]
def flush_to_db(self):
self.db.add_all(self._memory_peers.values())
self.db.commit()
def get_from_peer(self, peer):
return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first()
return self._memory_peers.get((peer.address, peer.udp_port), None)
def set_latency(self, peer, latency=None):
db_peer = self.get_from_peer(peer)
@ -134,27 +143,13 @@ class Crawler:
elif latency is not None and db_peer.first_online is None:
db_peer.first_online = datetime.datetime.utcnow()
db_peer.last_check = datetime.datetime.utcnow()
self.db.add(db_peer)
def inc_errors(self, peer):
db_peer = self.get_from_peer(peer)
db_peer.errors += 1
self.db.add(db_peer)
def count_peers(self):
return self.db.query(DHTPeer).count()
db_peer.errors = (db_peer.errors or 0) + 1
def associate_peers(self, target_peer_id, db_peer_ids):
connections = {
DHTConnection(
from_peer_id=target_peer_id,
to_peer_id=peer_id)
for peer_id in db_peer_ids
}
self.db.query(DHTPeer).filter(DHTPeer.peer_id.in_(set(db_peer_ids))).update(
{DHTPeer.last_seen: datetime.datetime.utcnow()})
self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == target_peer_id).delete()
self.db.add_all(connections)
return # todo
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
async with self.semaphore:
@ -177,7 +172,7 @@ class Crawler:
start = time.time()
log.info("querying %s:%d", host, port)
address = await resolve_host(host, port, 'udp')
this_peer_id, = self.add_peers(make_kademlia_peer(None, address, port))
self.add_peers(make_kademlia_peer(None, address, port))
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
latency = None
for _ in range(3):
@ -225,8 +220,8 @@ class Crawler:
factor = 2048
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
host, port, (time.time() - start), len(peers), i)
db_peer_ids = self.add_peers(*peers)
self.associate_peers(this_peer_id, db_peer_ids)
self.add_peers(*peers)
#self.associate_peers(this_peer_id, db_peer_ids)
self.db.commit()
return peers
@ -239,19 +234,25 @@ class Crawler:
f.add_done_callback(lambda _: to_process.pop(_peer))
to_check = self.get_peers_needing_check()
last_flush = datetime.datetime.utcnow()
while True:
for peer in to_check:
if peer not in to_process:
submit(peer)
if len(to_process) > 20:
await asyncio.sleep(.1)
if len(to_process) > 100:
break
await asyncio.sleep(0)
log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue",
self.active_peers_query.count(), self.checked_peers_count, self.unreachable_peers_count,
self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count,
self.peers_with_errors_count, len(to_process), len(to_check))
if to_process:
await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED)
to_check = self.get_peers_needing_check()
if (datetime.datetime.utcnow() - last_flush).seconds > 60:
log.info("flushing to db")
self.flush_to_db()
last_flush = datetime.datetime.utcnow()
while not to_check and not to_process:
log.info("Idle, sleeping a minute.")
await asyncio.sleep(60.0)
@ -262,7 +263,7 @@ async def test():
crawler = Crawler("/tmp/a.db")
await crawler.node.start_listening()
conf = Config()
if crawler.active_peers_query.count() < 100:
if crawler.active_peers_count < 100:
for (host, port) in conf.known_dht_nodes:
await crawler.crawl_routing_table(host, port)
await crawler.process()