forked from LBRYCommunity/lbry-sdk
dht_crawler: avoid reads
This commit is contained in:
parent
c6c27925b7
commit
61f7fbe230
1 changed files with 15 additions and 14 deletions
|
@ -106,13 +106,17 @@ class Crawler:
|
||||||
sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()])
|
sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()])
|
||||||
|
|
||||||
def add_peers(self, *peers):
|
def add_peers(self, *peers):
|
||||||
|
db_peers = []
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
db_peer = self.get_from_peer(peer)
|
db_peer = self.get_from_peer(peer)
|
||||||
if db_peer and db_peer.node_id is None and peer.node_id:
|
if db_peer and db_peer.node_id is None and peer.node_id:
|
||||||
db_peer.node_id = peer.node_id
|
db_peer.node_id = peer.node_id
|
||||||
self.db.add(db_peer)
|
|
||||||
elif not db_peer:
|
elif not db_peer:
|
||||||
self.db.add(DHTPeer.from_kad_peer(peer))
|
db_peer = DHTPeer.from_kad_peer(peer)
|
||||||
|
self.db.add(db_peer)
|
||||||
|
db_peers.append(db_peer)
|
||||||
|
self.db.flush()
|
||||||
|
return [dbp.peer_id for dbp in db_peers]
|
||||||
|
|
||||||
def get_from_peer(self, peer):
|
def get_from_peer(self, peer):
|
||||||
return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first()
|
return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first()
|
||||||
|
@ -137,18 +141,16 @@ class Crawler:
|
||||||
def count_peers(self):
|
def count_peers(self):
|
||||||
return self.db.query(DHTPeer).count()
|
return self.db.query(DHTPeer).count()
|
||||||
|
|
||||||
def associate_peers(self, target_peer, peers):
|
def associate_peers(self, target_peer_id, db_peer_ids):
|
||||||
db_peer = self.get_from_peer(target_peer)
|
|
||||||
connections = {
|
connections = {
|
||||||
DHTConnection(
|
DHTConnection(
|
||||||
from_peer_id=db_peer.peer_id,
|
from_peer_id=target_peer_id,
|
||||||
to_peer_id=self.get_from_peer(peer).peer_id)
|
to_peer_id=peer_id)
|
||||||
for peer in peers
|
for peer_id in db_peer_ids
|
||||||
}
|
}
|
||||||
all_peer_ids = {peer.node_id for peer in peers if peer.node_id}
|
self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(set(db_peer_ids))).update(
|
||||||
self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update(
|
|
||||||
{DHTPeer.last_seen: datetime.datetime.utcnow()})
|
{DHTPeer.last_seen: datetime.datetime.utcnow()})
|
||||||
self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete()
|
self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == target_peer_id).delete()
|
||||||
self.db.add_all(connections)
|
self.db.add_all(connections)
|
||||||
|
|
||||||
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
|
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
|
||||||
|
@ -172,7 +174,7 @@ class Crawler:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
log.info("querying %s:%d", host, port)
|
log.info("querying %s:%d", host, port)
|
||||||
address = await resolve_host(host, port, 'udp')
|
address = await resolve_host(host, port, 'udp')
|
||||||
self.add_peers(make_kademlia_peer(None, address, port))
|
this_peer_id, = self.add_peers(make_kademlia_peer(None, address, port))
|
||||||
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
latency = None
|
latency = None
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
|
@ -221,9 +223,8 @@ class Crawler:
|
||||||
factor = 2048
|
factor = 2048
|
||||||
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
||||||
host, port, (time.time() - start), len(peers), i)
|
host, port, (time.time() - start), len(peers), i)
|
||||||
self.add_peers(*peers)
|
db_peer_ids = self.add_peers(*peers)
|
||||||
self.associate_peers(make_kademlia_peer(key, address, port), peers)
|
self.associate_peers(this_peer_id, db_peer_ids)
|
||||||
self.db.flush()
|
|
||||||
self.db.commit()
|
self.db.commit()
|
||||||
return peers
|
return peers
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue