wait until k peers are ready. do not double add peers

This commit is contained in:
Victor Shyba 2022-02-07 23:13:58 -03:00
parent 44c4b03d44
commit 2884dba52d

View file

@ -115,7 +115,15 @@ class IterativeFinder:
def is_closest_peer_ready(self):
if not self.closest_peer or not self.prev_closest_peer:
return False
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer)
@property
def are_k_closest_peers_ready(self):
if not self.is_closest_peer_ready:
return False
to_probe = list(self.active)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
"""
@ -150,6 +158,8 @@ class IterativeFinder:
def _add_active(self, peer, force=False):
if not force and self.peer_manager.peer_is_good(peer) is False:
return
if peer in self.contacted:
return
if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
log.debug("[%s] closest peer went bad", self.key.hex()[:8])
if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
@ -177,6 +187,7 @@ class IterativeFinder:
self._log_state()
def _reset_closest(self, peer):
self.active.discard(peer)
if peer == self.prev_closest_peer:
self.prev_closest_peer = None
if peer == self.closest_peer:
@ -187,16 +198,15 @@ class IterativeFinder:
response = await self.send_probe(peer)
except asyncio.TimeoutError:
self._reset_closest(peer)
self.active.discard(peer)
return
except ValueError as err:
log.warning(str(err))
self._reset_closest(peer)
self.active.discard(peer)
return
except TransportNotConnected:
return self.aclose()
except RemoteException:
self._reset_closest(peer)
return
return await self._handle_probe_result(peer, response)
@ -338,10 +348,11 @@ class IterativeNodeFinder(IterativeFinder):
else:
self.bottom_out_count = 0
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.debug("limit hit")
if self.are_k_closest_peers_ready:
self.put_result(self.active, True)
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("peer search bottomed out.")
self.put_result(self.active, True)
class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
@ -408,7 +419,10 @@ class IterativeValueFinder(IterativeFinder):
# self.finished.set()
elif self.is_closest_peer_ready:
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
if self.are_k_closest_peers_ready:
log.info("blob peer search finished.")
self.iteration_queue.put_nowait(None)
elif self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
self.iteration_queue.put_nowait(None)