diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 31e6ab56a..62c13cd5d 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -89,7 +89,7 @@ class IterativeFinder: self.max_results = max_results self.exclude = exclude or [] - self.active: typing.Set['KademliaPeer'] = set() + self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) @@ -119,11 +119,12 @@ class IterativeFinder: @property def are_k_closest_peers_ready(self): - if not self.is_closest_peer_ready: + if not self.is_closest_peer_ready or len(self.active) < self.max_results: return False - to_probe = list(self.active) - to_probe.sort(key=lambda peer: self.distance(peer.node_id)) - return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results]) + for peer in list(self.active.keys())[:self.max_results]: + if peer not in self.contacted or not self.peer_manager.peer_is_good(peer): + return False + return True async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ @@ -169,7 +170,8 @@ class IterativeFinder: self.closest_peer = None self.prev_closest_peer = None if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: - self.active.add(peer) + self.active[peer] = self.distance(peer.node_id) + self.active = dict(sorted(self.active.items(), key=lambda item: item[1])) if self._is_closer(peer): self.prev_closest_peer = self.closest_peer self.closest_peer = peer @@ -187,7 +189,8 @@ class IterativeFinder: self._log_state() def _reset_closest(self, peer): - self.active.discard(peer) + if peer in self.active: + del self.active[peer] if peer == self.prev_closest_peer: self.prev_closest_peer = None if peer == self.closest_peer: @@ -216,10 +219,14 @@ class IterativeFinder: """ added = 0 - to_probe = list(self.active - self.contacted) - to_probe.sort(key=lambda peer: self.distance(peer.node_id)) - log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None) - for peer in to_probe: + for index, peer in enumerate(self.active.keys()): + if index == 0: + log.debug("closest to probe: %s", peer.node_id.hex()[:8]) + if self.closest_peer != peer: + self.prev_closest_peer = self.closest_peer + self.closest_peer = peer + if peer in self.contacted: + continue if added >= constants.ALPHA: break origin_address = (peer.address, peer.udp_port) @@ -320,7 +327,7 @@ class IterativeNodeFinder(IterativeFinder): return FindNodeResponse(self.key, response) def search_exhausted(self): - self.put_result(self.active, finish=True) + self.put_result(self.active.keys(), finish=True) def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): not_yet_yielded = [ @@ -342,17 +349,17 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") - return self.put_result(self.active, finish=True) + return self.put_result(self.active.keys(), finish=True) elif self.is_closest_peer_ready: self.bottom_out_count += 1 else: self.bottom_out_count = 0 if self.are_k_closest_peers_ready: - self.put_result(self.active, True) + self.put_result(self.active.keys(), True) elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: log.debug("peer search bottomed out.") - self.put_result(self.active, True) + self.put_result(self.active.keys(), True) class IterativeValueFinder(IterativeFinder):