diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index dfeb4795a..2fdc602eb 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -93,17 +93,14 @@ class IterativeFinder: self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) - self.closest_peer: typing.Optional['KademliaPeer'] = None - self.prev_closest_peer: typing.Optional['KademliaPeer'] = None - self.iteration_queue = asyncio.Queue(loop=self.loop) - self.running_probes: typing.Set[asyncio.Task] = set() + self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {} self.iteration_count = 0 self.bottom_out_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] - self.delayed_calls: typing.List[asyncio.Handle] = [] + self.delayed_call: asyncio.Handle = None for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: self._add_active(peer, force=True) @@ -111,21 +108,6 @@ class IterativeFinder: # seed nodes self._schedule_probe(peer) - @property - def is_closest_peer_ready(self): - if not self.closest_peer or not self.prev_closest_peer: - return False - return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) - - @property - def are_k_closest_peers_ready(self): - if not self.is_closest_peer_ready or len(self.active) < self.max_results: - return False - for peer in list(self.active.keys())[:self.max_results]: - if peer not in self.contacted or not self.peer_manager.peer_is_good(peer): - return False - return True - async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ Send the rpc request to the peer and return an object with the FindResponse interface @@ -153,28 +135,14 @@ class IterativeFinder: """ return [] - def _is_closer(self, peer: 'KademliaPeer') -> bool: - return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) - def _add_active(self, peer, force=False): if not force and self.peer_manager.peer_is_good(peer) is False: return if peer in self.contacted: return - if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False: - log.debug("[%s] closest peer went bad", self.key.hex()[:8]) - if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False: - log.debug("[%s] previous closest was bad too", self.key.hex()[:8]) - self.closest_peer = self.prev_closest_peer - else: - self.closest_peer = None - self.prev_closest_peer = None if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: self.active[peer] = self.distance(peer.node_id) self.active = dict(sorted(self.active.items(), key=lambda item: item[1])) - if self._is_closer(peer): - self.prev_closest_peer = self.closest_peer - self.closest_peer = peer async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) @@ -191,10 +159,6 @@ class IterativeFinder: def _reset_closest(self, peer): if peer in self.active: del self.active[peer] - if peer == self.prev_closest_peer: - self.prev_closest_peer = None - if peer == self.closest_peer: - self.closest_peer = self.prev_closest_peer async def _send_probe(self, peer: 'KademliaPeer'): try: @@ -213,7 +177,7 @@ class IterativeFinder: return return await self._handle_probe_result(peer, response) - async def _search_round(self): + def _search_round(self): """ Send up to constants.alpha (5) probes to closest active peers """ @@ -222,14 +186,11 @@ class IterativeFinder: for index, peer in enumerate(self.active.keys()): if index == 0: log.debug("closest to probe: %s", peer.node_id.hex()[:8]) - if self.closest_peer != peer: - self.prev_closest_peer = self.closest_peer - self.closest_peer = peer if peer in self.contacted: continue - if added >= constants.ALPHA: + if len(self.running_probes) >= constants.ALPHA: break - if index > self.max_results: + if index > (constants.K - 1): break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: @@ -251,21 +212,21 @@ class IterativeFinder: t = self.loop.create_task(self._send_probe(peer)) def callback(_): - self.running_probes.difference_update({ - probe for probe in self.running_probes if probe.done() or probe == t - }) - if not self.running_probes: - self.tasks.append(self.loop.create_task(self._search_task(0.0))) + for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]: + del self.running_probes[peer] + self._search_task(0.0) t.add_done_callback(callback) - self.running_probes.add(t) + self.running_probes[peer] = t - async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): + def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): try: if self.running: - await self._search_round() - if self.running: - self.delayed_calls.append(self.loop.call_later(delay, self._search)) + if self.delayed_call: + self.delayed_call.cancel() # ensure anything scheduled gets cancelled + self._search_round() + #if self.running: + # self.delayed_call = self.loop.call_later(delay, self._search) except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected): if self.running: self.loop.call_soon(self.aclose) @@ -273,13 +234,9 @@ class IterativeFinder: def _log_state(self): log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count", self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count) - if self.closest_peer and self.prev_closest_peer: - log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s", - self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted, - self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8]) def _search(self): - self.tasks.append(self.loop.create_task(self._search_task())) + self._search_task() def __aiter__(self): if self.running: @@ -305,11 +262,11 @@ class IterativeFinder: def aclose(self): self.running = False self.iteration_queue.put_nowait(None) - for task in chain(self.tasks, self.running_probes, self.delayed_calls): + for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])): task.cancel() self.tasks.clear() self.running_probes.clear() - self.delayed_calls.clear() + self.delayed_call = None class IterativeNodeFinder(IterativeFinder): @@ -352,16 +309,6 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") return self.put_result(self.active.keys(), finish=True) - elif self.is_closest_peer_ready: - self.bottom_out_count += 1 - else: - self.bottom_out_count = 0 - - if self.are_k_closest_peers_ready: - self.put_result(self.active.keys(), True) - elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.warning("peer search bottomed out.") - self.put_result([], True) class IterativeValueFinder(IterativeFinder): @@ -427,14 +374,6 @@ class IterativeValueFinder(IterativeFinder): # log.info("enough blob peers found") # if not self.finished.is_set(): # self.finished.set() - elif self.is_closest_peer_ready: - self.bottom_out_count += 1 - if self.are_k_closest_peers_ready: - log.info("blob peer search finished for %s", self.key.hex()[:8]) - self.iteration_queue.put_nowait(None) - elif self.bottom_out_count >= self.bottom_out_limit: - log.info("blob peer search bottomed out for %s", self.key.hex()[:8]) - self.iteration_queue.put_nowait(None) def get_initial_result(self) -> typing.List['KademliaPeer']: if self.protocol.data_store.has_peers_for_blob(self.key):