use a dict for the active queue

This commit is contained in:
Victor Shyba 2022-02-08 19:57:17 -03:00
parent 2ed23fbc4b
commit b7b8831109

View file

@ -89,7 +89,7 @@ class IterativeFinder:
self.max_results = max_results
self.exclude = exclude or []
self.active: typing.Set['KademliaPeer'] = set()
self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)
@ -119,11 +119,12 @@ class IterativeFinder:
@property
def are_k_closest_peers_ready(self):
if not self.is_closest_peer_ready:
if not self.is_closest_peer_ready or len(self.active) < self.max_results:
return False
to_probe = list(self.active)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])
for peer in list(self.active.keys())[:self.max_results]:
if peer not in self.contacted or not self.peer_manager.peer_is_good(peer):
return False
return True
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
"""
@ -169,7 +170,8 @@ class IterativeFinder:
self.closest_peer = None
self.prev_closest_peer = None
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
self.active[peer] = self.distance(peer.node_id)
self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
@ -187,7 +189,8 @@ class IterativeFinder:
self._log_state()
def _reset_closest(self, peer):
self.active.discard(peer)
if peer in self.active:
del self.active[peer]
if peer == self.prev_closest_peer:
self.prev_closest_peer = None
if peer == self.closest_peer:
@ -216,10 +219,14 @@ class IterativeFinder:
"""
added = 0
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
for peer in to_probe:
for index, peer in enumerate(self.active.keys()):
if index == 0:
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
if self.closest_peer != peer:
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
if peer in self.contacted:
continue
if added >= constants.ALPHA:
break
origin_address = (peer.address, peer.udp_port)
@ -320,7 +327,7 @@ class IterativeNodeFinder(IterativeFinder):
return FindNodeResponse(self.key, response)
def search_exhausted(self):
self.put_result(self.active, finish=True)
self.put_result(self.active.keys(), finish=True)
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
@ -342,17 +349,17 @@ class IterativeNodeFinder(IterativeFinder):
if found:
log.debug("found")
return self.put_result(self.active, finish=True)
return self.put_result(self.active.keys(), finish=True)
elif self.is_closest_peer_ready:
self.bottom_out_count += 1
else:
self.bottom_out_count = 0
if self.are_k_closest_peers_ready:
self.put_result(self.active, True)
self.put_result(self.active.keys(), True)
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.debug("peer search bottomed out.")
self.put_result(self.active, True)
self.put_result(self.active.keys(), True)
class IterativeValueFinder(IterativeFinder):