better iterative find, first part

This commit is contained in:
Victor Shyba 2021-12-03 12:15:41 -03:00
parent 7531401623
commit 400c4e429b

View file

@ -85,22 +85,19 @@ class IterativeFinder:
self.protocol = protocol self.protocol = protocol
self.key = key self.key = key
self.bottom_out_limit = bottom_out_limit self.max_results = max(constants.K, max_results)
self.max_results = max_results
self.exclude = exclude or [] self.exclude = exclude or []
self.active: typing.Set['KademliaPeer'] = set() self.active: typing.Set['KademliaPeer'] = set()
self.contacted: typing.Set['KademliaPeer'] = set() self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key) self.distance = Distance(key)
self.closest_peer: typing.Optional['KademliaPeer'] = None self.closest_k_ids: typing.List[bytes] = []
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
self.iteration_queue = asyncio.Queue(loop=self.loop) self.iteration_queue = asyncio.Queue(loop=self.loop)
self.running_probes: typing.Set[asyncio.Task] = set() self.running_probes: typing.Set[asyncio.Task] = set()
self.iteration_count = 0 self.iteration_count = 0
self.bottom_out_count = 0
self.running = False self.running = False
self.tasks: typing.List[asyncio.Task] = [] self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = [] self.delayed_calls: typing.List[asyncio.Handle] = []
@ -139,14 +136,16 @@ class IterativeFinder:
return [] return []
def _is_closer(self, peer: 'KademliaPeer') -> bool: def _is_closer(self, peer: 'KademliaPeer') -> bool:
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) if len(self.closest_k_ids) < self.max_results:
return True
return self.distance.is_closer(peer.node_id, self.closest_k_ids[self.max_results - 1])
def _add_active(self, peer): def _add_active(self, peer):
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer) self.active.add(peer)
if self._is_closer(peer): if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer self.closest_k_ids.append(peer.node_id)
self.closest_peer = peer self.closest_k_ids.sort(key=lambda sorting_peer: self.distance(self.key))
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer) self._add_active(peer)
@ -285,7 +284,7 @@ class IterativeNodeFinder(IterativeFinder):
and self.peer_manager.peer_is_good(peer) is not False and self.peer_manager.peer_is_good(peer) is not False
] ]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:min(constants.K, len(not_yet_yielded))] to_yield = not_yet_yielded[:self.max_results]
if to_yield: if to_yield:
self.yielded_peers.update(to_yield) self.yielded_peers.update(to_yield)
self.iteration_queue.put_nowait(to_yield) self.iteration_queue.put_nowait(to_yield)
@ -297,17 +296,7 @@ class IterativeNodeFinder(IterativeFinder):
if found: if found:
log.debug("found") log.debug("found")
return self.put_result(self.active, finish=True) return self.put_result(self.active.union(self.contacted), finish=True)
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
# self.bottom_out_count, self.iteration_count)
self.bottom_out_count = 0
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("limit hit")
self.put_result(self.active, True)
class IterativeValueFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder):
@ -359,7 +348,6 @@ class IterativeValueFinder(IterativeFinder):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses] for compact_addr in response.found_compact_addresses]
to_yield = [] to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers: for blob_peer in blob_peers:
if blob_peer not in self.blob_peers: if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer) self.blob_peers.add(blob_peer)
@ -371,11 +359,6 @@ class IterativeValueFinder(IterativeFinder):
# log.info("enough blob peers found") # log.info("enough blob peers found")
# if not self.finished.is_set(): # if not self.finished.is_set():
# self.finished.set() # self.finished.set()
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
self.iteration_queue.put_nowait(None)
def get_initial_result(self) -> typing.List['KademliaPeer']: def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key): if self.protocol.data_store.has_peers_for_blob(self.key):