diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index fc523f480..5f5ca0ad4 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -85,22 +85,19 @@ class IterativeFinder: self.protocol = protocol self.key = key - self.bottom_out_limit = bottom_out_limit - self.max_results = max_results + self.max_results = max(constants.K, max_results) self.exclude = exclude or [] self.active: typing.Set['KademliaPeer'] = set() self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) - self.closest_peer: typing.Optional['KademliaPeer'] = None - self.prev_closest_peer: typing.Optional['KademliaPeer'] = None + self.closest_k_ids: typing.List[bytes] = [] self.iteration_queue = asyncio.Queue(loop=self.loop) self.running_probes: typing.Set[asyncio.Task] = set() self.iteration_count = 0 - self.bottom_out_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] self.delayed_calls: typing.List[asyncio.Handle] = [] @@ -139,14 +136,16 @@ class IterativeFinder: return [] def _is_closer(self, peer: 'KademliaPeer') -> bool: - return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) + if len(self.closest_k_ids) < self.max_results: + return True + return self.distance.is_closer(peer.node_id, self.closest_k_ids[self.max_results - 1]) def _add_active(self, peer): if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: self.active.add(peer) if self._is_closer(peer): - self.prev_closest_peer = self.closest_peer - self.closest_peer = peer + self.closest_k_ids.append(peer.node_id) + self.closest_k_ids.sort(key=lambda sorting_peer: self.distance(self.key)) async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) @@ -285,7 +284,7 @@ class IterativeNodeFinder(IterativeFinder): and self.peer_manager.peer_is_good(peer) is not False ] not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) - to_yield = not_yet_yielded[:min(constants.K, len(not_yet_yielded))] + to_yield = not_yet_yielded[:self.max_results] if to_yield: self.yielded_peers.update(to_yield) self.iteration_queue.put_nowait(to_yield) @@ -297,17 +296,7 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") - return self.put_result(self.active, finish=True) - if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer): - # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted), - # self.bottom_out_count, self.iteration_count) - self.bottom_out_count = 0 - elif self.prev_closest_peer and self.closest_peer: - self.bottom_out_count += 1 - log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count) - if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.info("limit hit") - self.put_result(self.active, True) + return self.put_result(self.active.union(self.contacted), finish=True) class IterativeValueFinder(IterativeFinder): @@ -359,7 +348,6 @@ class IterativeValueFinder(IterativeFinder): blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] to_yield = [] - self.bottom_out_count = 0 for blob_peer in blob_peers: if blob_peer not in self.blob_peers: self.blob_peers.add(blob_peer) @@ -371,11 +359,6 @@ class IterativeValueFinder(IterativeFinder): # log.info("enough blob peers found") # if not self.finished.is_set(): # self.finished.set() - elif self.prev_closest_peer and self.closest_peer: - self.bottom_out_count += 1 - if self.bottom_out_count >= self.bottom_out_limit: - log.info("blob peer search bottomed out") - self.iteration_queue.put_nowait(None) def get_initial_result(self) -> typing.List['KademliaPeer']: if self.protocol.data_store.has_peers_for_blob(self.key):