diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index bbfc93bd6..409efcdb4 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -98,7 +98,6 @@ class IterativeFinder: self.iteration_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] - self.delayed_call: asyncio.Handle = None for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: self._add_active(peer, force=True) @@ -184,7 +183,6 @@ class IterativeFinder: for index, peer in enumerate(self.active.keys()): if index == 0: log.debug("closest to probe: %s", peer.node_id.hex()[:8]) - if peer in self.contacted: continue if len(self.running_probes) >= constants.ALPHA: @@ -211,37 +209,22 @@ class IterativeFinder: t = self.loop.create_task(self._send_probe(peer)) def callback(_): - for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]: - del self.running_probes[peer] - self._search_task(0.0) + self.running_probes.pop(peer, None) + if self.running: + self._search_round() t.add_done_callback(callback) self.running_probes[peer] = t - def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): - try: - if self.running: - if self.delayed_call: - self.delayed_call.cancel() # ensure anything scheduled gets cancelled - self._search_round() - #if self.running: - # self.delayed_call = self.loop.call_later(delay, self._search) - except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected): - if self.running: - self.loop.call_soon(self.aclose) - def _log_state(self): log.debug("[%s] check result: %i active nodes %i contacted", self.key.hex()[:8], len(self.active), len(self.contacted)) - def _search(self): - self._search_task() - def __aiter__(self): if self.running: raise Exception("already running") self.running = True - self._search() + self.loop.call_soon(self._search_round) return self async def __anext__(self) -> typing.List['KademliaPeer']: @@ -261,11 +244,10 @@ class IterativeFinder: def aclose(self): self.running = False self.iteration_queue.put_nowait(None) - for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])): + for task in chain(self.tasks, self.running_probes.values()): task.cancel() self.tasks.clear() self.running_probes.clear() - self.delayed_call = None class IterativeNodeFinder(IterativeFinder):