forked from LBRYCommunity/lbry-sdk
wait until k peers are ready. do not double add peers
This commit is contained in:
parent
af1a6edd15
commit
9a79b33664
1 changed files with 21 additions and 7 deletions
|
@ -115,7 +115,15 @@ class IterativeFinder:
|
|||
def is_closest_peer_ready(self):
|
||||
if not self.closest_peer or not self.prev_closest_peer:
|
||||
return False
|
||||
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False
|
||||
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer)
|
||||
|
||||
@property
|
||||
def are_k_closest_peers_ready(self):
|
||||
if not self.is_closest_peer_ready:
|
||||
return False
|
||||
to_probe = list(self.active)
|
||||
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
|
||||
return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])
|
||||
|
||||
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
|
||||
"""
|
||||
|
@ -150,6 +158,8 @@ class IterativeFinder:
|
|||
def _add_active(self, peer, force=False):
|
||||
if not force and self.peer_manager.peer_is_good(peer) is False:
|
||||
return
|
||||
if peer in self.contacted:
|
||||
return
|
||||
if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
|
||||
log.debug("[%s] closest peer went bad", self.key.hex()[:8])
|
||||
if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
|
||||
|
@ -177,6 +187,7 @@ class IterativeFinder:
|
|||
self._log_state()
|
||||
|
||||
def _reset_closest(self, peer):
|
||||
self.active.discard(peer)
|
||||
if peer == self.prev_closest_peer:
|
||||
self.prev_closest_peer = None
|
||||
if peer == self.closest_peer:
|
||||
|
@ -187,16 +198,15 @@ class IterativeFinder:
|
|||
response = await self.send_probe(peer)
|
||||
except asyncio.TimeoutError:
|
||||
self._reset_closest(peer)
|
||||
self.active.discard(peer)
|
||||
return
|
||||
except ValueError as err:
|
||||
log.warning(str(err))
|
||||
self._reset_closest(peer)
|
||||
self.active.discard(peer)
|
||||
return
|
||||
except TransportNotConnected:
|
||||
return self.aclose()
|
||||
except RemoteException:
|
||||
self._reset_closest(peer)
|
||||
return
|
||||
return await self._handle_probe_result(peer, response)
|
||||
|
||||
|
@ -338,10 +348,11 @@ class IterativeNodeFinder(IterativeFinder):
|
|||
else:
|
||||
self.bottom_out_count = 0
|
||||
|
||||
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
||||
log.debug("limit hit")
|
||||
if self.are_k_closest_peers_ready:
|
||||
self.put_result(self.active, True)
|
||||
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
||||
log.info("peer search bottomed out.")
|
||||
self.put_result(self.active, True)
|
||||
|
||||
|
||||
class IterativeValueFinder(IterativeFinder):
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
||||
|
@ -408,7 +419,10 @@ class IterativeValueFinder(IterativeFinder):
|
|||
# self.finished.set()
|
||||
elif self.is_closest_peer_ready:
|
||||
self.bottom_out_count += 1
|
||||
if self.bottom_out_count >= self.bottom_out_limit:
|
||||
if self.are_k_closest_peers_ready:
|
||||
log.info("blob peer search finished.")
|
||||
self.iteration_queue.put_nowait(None)
|
||||
elif self.bottom_out_count >= self.bottom_out_limit:
|
||||
log.info("blob peer search bottomed out")
|
||||
self.iteration_queue.put_nowait(None)
|
||||
|
||||
|
|
Loading…
Reference in a new issue