diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index 6516d7114..e8d4a6286 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -87,9 +87,8 @@ class IterativeFinder: self.max_results = max_results self.exclude = exclude or [] - self.shortlist: typing.List['KademliaPeer'] = get_shortlist(routing_table, key, shortlist) - self.active: typing.List['KademliaPeer'] = [] - self.contacted: typing.Set[typing.Tuple[str, int]] = set() + self.active: typing.Set['KademliaPeer'] = set() + self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) self.closest_peer: typing.Optional['KademliaPeer'] = None @@ -103,6 +102,12 @@ class IterativeFinder: self.running = False self.tasks: typing.List[asyncio.Task] = [] self.delayed_calls: typing.List[asyncio.Handle] = [] + for peer in get_shortlist(routing_table, key, shortlist): + if peer.node_id: + self._add_active(peer) + else: + # seed nodes + self._schedule_probe(peer) async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ @@ -134,36 +139,32 @@ class IterativeFinder: def _is_closer(self, peer: 'KademliaPeer') -> bool: return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) - def _update_closest(self): - self.active.sort(key=lambda peer: self.distance(peer.node_id)) - if self.closest_peer and self.closest_peer is not self.active[0]: - if self._is_closer(self.active[0]): - self.prev_closest_peer = self.closest_peer - self.closest_peer = self.active[0] + def _add_active(self, peer): + if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: + if self.peer_manager.peer_is_good(peer) is not False: + self.active.add(peer) + if self._is_closer(peer): + self.prev_closest_peer = self.closest_peer + self.closest_peer = peer + else: + self.protocol.remove_peer(peer) async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): - if peer not in self.active and peer.node_id: - self.active.append(peer) + self._add_active(peer) for contact_triple in response.get_close_triples(): node_id, address, udp_port = contact_triple - if (address, udp_port) not in self.contacted: # and not self.peer_manager.is_ignored(addr_tuple) - found_peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port) - if found_peer not in self.active and self.peer_manager.peer_is_good(found_peer) is not False: - self.active.append(found_peer) - self._update_closest() + self._add_active(self.peer_manager.get_kademlia_peer(node_id, address, udp_port)) self.check_result_ready(response) async def _send_probe(self, peer: 'KademliaPeer'): try: response = await self.send_probe(peer) except asyncio.TimeoutError: - if peer in self.active: - self.active.remove(peer) + self.active.discard(peer) return except ValueError as err: log.warning(str(err)) - if peer in self.active: - self.active.remove(peer) + self.active.discard(peer) return except TransportNotConnected: return self.aclose() @@ -173,43 +174,44 @@ class IterativeFinder: async def _search_round(self): """ - Send up to constants.alpha (5) probes to the closest peers in the shortlist + Send up to constants.alpha (5) probes to closest active peers """ added = 0 - for peer in chain(self.active, self.shortlist): + to_probe = list(self.active - self.contacted) + to_probe.sort(key=lambda peer: self.distance(self.key)) + for peer in to_probe: if added >= constants.alpha: break origin_address = (peer.address, peer.udp_port) - if self.peer_manager.peer_is_good(peer) is False: - self.protocol.remove_peer(peer) - continue if origin_address in self.exclude: continue if peer.node_id == self.protocol.node_id: continue if origin_address == (self.protocol.external_ip, self.protocol.udp_port): continue - if origin_address not in self.contacted: - self.contacted.add(origin_address) - - t = self.loop.create_task(self._send_probe(peer)) - - def callback(_): - self.running_probes.difference_update({ - probe for probe in self.running_probes if probe.done() or probe == t - }) - if not self.running_probes: - self.tasks.append(self.loop.create_task(self._search_task(0.0))) - - t.add_done_callback(callback) - self.running_probes.add(t) - added += 1 + self._schedule_probe(peer) + added += 1 log.debug("running %d probes", len(self.running_probes)) if not added and not self.running_probes: log.debug("search for %s exhausted", hexlify(self.key)[:8]) self.search_exhausted() + def _schedule_probe(self, peer: 'KademliaPeer'): + self.contacted.add(peer) + + t = self.loop.create_task(self._send_probe(peer)) + + def callback(_): + self.running_probes.difference_update({ + probe for probe in self.running_probes if probe.done() or probe == t + }) + if not self.running_probes: + self.tasks.append(self.loop.create_task(self._search_task(0.0))) + + t.add_done_callback(callback) + self.running_probes.add(t) + async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay): try: if self.running: @@ -296,8 +298,7 @@ class IterativeNodeFinder(IterativeFinder): self.bottom_out_count = 0 elif self.prev_closest_peer and self.closest_peer: self.bottom_out_count += 1 - log.info("bottom out %i %i %i %i", len(self.active), len(self.contacted), len(self.shortlist), - self.bottom_out_count) + log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count) if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: log.info("limit hit") self.put_result(self.active, True)