diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index ad0f03863..31af402c3 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -115,7 +115,15 @@ class IterativeFinder: def is_closest_peer_ready(self): if not self.closest_peer or not self.prev_closest_peer: return False - return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False + return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) + + @property + def are_k_closest_peers_ready(self): + if not self.is_closest_peer_ready: + return False + to_probe = list(self.active) + to_probe.sort(key=lambda peer: self.distance(peer.node_id)) + return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results]) async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ @@ -150,6 +158,8 @@ class IterativeFinder: def _add_active(self, peer, force=False): if not force and self.peer_manager.peer_is_good(peer) is False: return + if peer in self.contacted: + return if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False: log.debug("[%s] closest peer went bad", self.key.hex()[:8]) if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False: @@ -177,6 +187,7 @@ class IterativeFinder: self._log_state() def _reset_closest(self, peer): + self.active.discard(peer) if peer == self.prev_closest_peer: self.prev_closest_peer = None if peer == self.closest_peer: @@ -187,16 +198,15 @@ class IterativeFinder: response = await self.send_probe(peer) except asyncio.TimeoutError: self._reset_closest(peer) - self.active.discard(peer) return except ValueError as err: log.warning(str(err)) self._reset_closest(peer) - self.active.discard(peer) return except TransportNotConnected: return self.aclose() except RemoteException: + self._reset_closest(peer) return return await self._handle_probe_result(peer, response) @@ -338,10 +348,11 @@ class IterativeNodeFinder(IterativeFinder): else: self.bottom_out_count = 0 - if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.debug("limit hit") + if self.are_k_closest_peers_ready: + self.put_result(self.active, True) + elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: + log.info("peer search bottomed out.") self.put_result(self.active, True) - class IterativeValueFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', @@ -408,7 +419,10 @@ class IterativeValueFinder(IterativeFinder): # self.finished.set() elif self.is_closest_peer_ready: self.bottom_out_count += 1 - if self.bottom_out_count >= self.bottom_out_limit: + if self.are_k_closest_peers_ready: + log.info("blob peer search finished.") + self.iteration_queue.put_nowait(None) + elif self.bottom_out_count >= self.bottom_out_limit: log.info("blob peer search bottomed out") self.iteration_queue.put_nowait(None)