forked from LBRYCommunity/lbry-sdk
no stop condition, let it exhaust
This commit is contained in:
parent
c45f27d5cc
commit
dc1c0e6851
1 changed files with 18 additions and 79 deletions
|
@ -93,17 +93,14 @@ class IterativeFinder:
|
|||
self.contacted: typing.Set['KademliaPeer'] = set()
|
||||
self.distance = Distance(key)
|
||||
|
||||
self.closest_peer: typing.Optional['KademliaPeer'] = None
|
||||
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
|
||||
|
||||
self.iteration_queue = asyncio.Queue(loop=self.loop)
|
||||
|
||||
self.running_probes: typing.Set[asyncio.Task] = set()
|
||||
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
|
||||
self.iteration_count = 0
|
||||
self.bottom_out_count = 0
|
||||
self.running = False
|
||||
self.tasks: typing.List[asyncio.Task] = []
|
||||
self.delayed_calls: typing.List[asyncio.Handle] = []
|
||||
self.delayed_call: asyncio.Handle = None
|
||||
for peer in get_shortlist(routing_table, key, shortlist):
|
||||
if peer.node_id:
|
||||
self._add_active(peer, force=True)
|
||||
|
@ -111,21 +108,6 @@ class IterativeFinder:
|
|||
# seed nodes
|
||||
self._schedule_probe(peer)
|
||||
|
||||
@property
|
||||
def is_closest_peer_ready(self):
|
||||
if not self.closest_peer or not self.prev_closest_peer:
|
||||
return False
|
||||
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer)
|
||||
|
||||
@property
|
||||
def are_k_closest_peers_ready(self):
|
||||
if not self.is_closest_peer_ready or len(self.active) < self.max_results:
|
||||
return False
|
||||
for peer in list(self.active.keys())[:self.max_results]:
|
||||
if peer not in self.contacted or not self.peer_manager.peer_is_good(peer):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
|
||||
"""
|
||||
Send the rpc request to the peer and return an object with the FindResponse interface
|
||||
|
@ -153,28 +135,14 @@ class IterativeFinder:
|
|||
"""
|
||||
return []
|
||||
|
||||
def _is_closer(self, peer: 'KademliaPeer') -> bool:
|
||||
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
|
||||
|
||||
def _add_active(self, peer, force=False):
|
||||
if not force and self.peer_manager.peer_is_good(peer) is False:
|
||||
return
|
||||
if peer in self.contacted:
|
||||
return
|
||||
if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
|
||||
log.debug("[%s] closest peer went bad", self.key.hex()[:8])
|
||||
if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
|
||||
log.debug("[%s] previous closest was bad too", self.key.hex()[:8])
|
||||
self.closest_peer = self.prev_closest_peer
|
||||
else:
|
||||
self.closest_peer = None
|
||||
self.prev_closest_peer = None
|
||||
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
|
||||
self.active[peer] = self.distance(peer.node_id)
|
||||
self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))
|
||||
if self._is_closer(peer):
|
||||
self.prev_closest_peer = self.closest_peer
|
||||
self.closest_peer = peer
|
||||
|
||||
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
||||
self._add_active(peer)
|
||||
|
@ -191,10 +159,6 @@ class IterativeFinder:
|
|||
def _reset_closest(self, peer):
|
||||
if peer in self.active:
|
||||
del self.active[peer]
|
||||
if peer == self.prev_closest_peer:
|
||||
self.prev_closest_peer = None
|
||||
if peer == self.closest_peer:
|
||||
self.closest_peer = self.prev_closest_peer
|
||||
|
||||
async def _send_probe(self, peer: 'KademliaPeer'):
|
||||
try:
|
||||
|
@ -213,7 +177,7 @@ class IterativeFinder:
|
|||
return
|
||||
return await self._handle_probe_result(peer, response)
|
||||
|
||||
async def _search_round(self):
|
||||
def _search_round(self):
|
||||
"""
|
||||
Send up to constants.alpha (5) probes to closest active peers
|
||||
"""
|
||||
|
@ -222,14 +186,11 @@ class IterativeFinder:
|
|||
for index, peer in enumerate(self.active.keys()):
|
||||
if index == 0:
|
||||
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
|
||||
if self.closest_peer != peer:
|
||||
self.prev_closest_peer = self.closest_peer
|
||||
self.closest_peer = peer
|
||||
if peer in self.contacted:
|
||||
continue
|
||||
if added >= constants.ALPHA:
|
||||
if len(self.running_probes) >= constants.ALPHA:
|
||||
break
|
||||
if index > self.max_results:
|
||||
if index > (constants.K - 1):
|
||||
break
|
||||
origin_address = (peer.address, peer.udp_port)
|
||||
if origin_address in self.exclude:
|
||||
|
@ -251,21 +212,21 @@ class IterativeFinder:
|
|||
t = self.loop.create_task(self._send_probe(peer))
|
||||
|
||||
def callback(_):
|
||||
self.running_probes.difference_update({
|
||||
probe for probe in self.running_probes if probe.done() or probe == t
|
||||
})
|
||||
if not self.running_probes:
|
||||
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
|
||||
for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]:
|
||||
del self.running_probes[peer]
|
||||
self._search_task(0.0)
|
||||
|
||||
t.add_done_callback(callback)
|
||||
self.running_probes.add(t)
|
||||
self.running_probes[peer] = t
|
||||
|
||||
async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
|
||||
def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
|
||||
try:
|
||||
if self.running:
|
||||
await self._search_round()
|
||||
if self.running:
|
||||
self.delayed_calls.append(self.loop.call_later(delay, self._search))
|
||||
if self.delayed_call:
|
||||
self.delayed_call.cancel() # ensure anything scheduled gets cancelled
|
||||
self._search_round()
|
||||
#if self.running:
|
||||
# self.delayed_call = self.loop.call_later(delay, self._search)
|
||||
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
|
||||
if self.running:
|
||||
self.loop.call_soon(self.aclose)
|
||||
|
@ -273,13 +234,9 @@ class IterativeFinder:
|
|||
def _log_state(self):
|
||||
log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
|
||||
self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
|
||||
if self.closest_peer and self.prev_closest_peer:
|
||||
log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s",
|
||||
self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted,
|
||||
self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8])
|
||||
|
||||
def _search(self):
|
||||
self.tasks.append(self.loop.create_task(self._search_task()))
|
||||
self._search_task()
|
||||
|
||||
def __aiter__(self):
|
||||
if self.running:
|
||||
|
@ -305,11 +262,11 @@ class IterativeFinder:
|
|||
def aclose(self):
|
||||
self.running = False
|
||||
self.iteration_queue.put_nowait(None)
|
||||
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
|
||||
for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])):
|
||||
task.cancel()
|
||||
self.tasks.clear()
|
||||
self.running_probes.clear()
|
||||
self.delayed_calls.clear()
|
||||
self.delayed_call = None
|
||||
|
||||
|
||||
class IterativeNodeFinder(IterativeFinder):
|
||||
|
@ -352,16 +309,6 @@ class IterativeNodeFinder(IterativeFinder):
|
|||
if found:
|
||||
log.debug("found")
|
||||
return self.put_result(self.active.keys(), finish=True)
|
||||
elif self.is_closest_peer_ready:
|
||||
self.bottom_out_count += 1
|
||||
else:
|
||||
self.bottom_out_count = 0
|
||||
|
||||
if self.are_k_closest_peers_ready:
|
||||
self.put_result(self.active.keys(), True)
|
||||
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
||||
log.warning("peer search bottomed out.")
|
||||
self.put_result([], True)
|
||||
|
||||
|
||||
class IterativeValueFinder(IterativeFinder):
|
||||
|
@ -427,14 +374,6 @@ class IterativeValueFinder(IterativeFinder):
|
|||
# log.info("enough blob peers found")
|
||||
# if not self.finished.is_set():
|
||||
# self.finished.set()
|
||||
elif self.is_closest_peer_ready:
|
||||
self.bottom_out_count += 1
|
||||
if self.are_k_closest_peers_ready:
|
||||
log.info("blob peer search finished for %s", self.key.hex()[:8])
|
||||
self.iteration_queue.put_nowait(None)
|
||||
elif self.bottom_out_count >= self.bottom_out_limit:
|
||||
log.info("blob peer search bottomed out for %s", self.key.hex()[:8])
|
||||
self.iteration_queue.put_nowait(None)
|
||||
|
||||
def get_initial_result(self) -> typing.List['KademliaPeer']:
|
||||
if self.protocol.data_store.has_peers_for_blob(self.key):
|
||||
|
|
Loading…
Reference in a new issue