forked from LBRYCommunity/lbry-sdk
use a dict for the active queue
This commit is contained in:
parent
3876e0317d
commit
7d4966e2ae
1 changed files with 22 additions and 15 deletions
|
@ -89,7 +89,7 @@ class IterativeFinder:
|
||||||
self.max_results = max_results
|
self.max_results = max_results
|
||||||
self.exclude = exclude or []
|
self.exclude = exclude or []
|
||||||
|
|
||||||
self.active: typing.Set['KademliaPeer'] = set()
|
self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted
|
||||||
self.contacted: typing.Set['KademliaPeer'] = set()
|
self.contacted: typing.Set['KademliaPeer'] = set()
|
||||||
self.distance = Distance(key)
|
self.distance = Distance(key)
|
||||||
|
|
||||||
|
@ -119,11 +119,12 @@ class IterativeFinder:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def are_k_closest_peers_ready(self):
|
def are_k_closest_peers_ready(self):
|
||||||
if not self.is_closest_peer_ready:
|
if not self.is_closest_peer_ready or len(self.active) < self.max_results:
|
||||||
return False
|
return False
|
||||||
to_probe = list(self.active)
|
for peer in list(self.active.keys())[:self.max_results]:
|
||||||
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
|
if peer not in self.contacted or not self.peer_manager.peer_is_good(peer):
|
||||||
return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
|
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
|
||||||
"""
|
"""
|
||||||
|
@ -169,7 +170,8 @@ class IterativeFinder:
|
||||||
self.closest_peer = None
|
self.closest_peer = None
|
||||||
self.prev_closest_peer = None
|
self.prev_closest_peer = None
|
||||||
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
|
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
|
||||||
self.active.add(peer)
|
self.active[peer] = self.distance(peer.node_id)
|
||||||
|
self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))
|
||||||
if self._is_closer(peer):
|
if self._is_closer(peer):
|
||||||
self.prev_closest_peer = self.closest_peer
|
self.prev_closest_peer = self.closest_peer
|
||||||
self.closest_peer = peer
|
self.closest_peer = peer
|
||||||
|
@ -187,7 +189,8 @@ class IterativeFinder:
|
||||||
self._log_state()
|
self._log_state()
|
||||||
|
|
||||||
def _reset_closest(self, peer):
|
def _reset_closest(self, peer):
|
||||||
self.active.discard(peer)
|
if peer in self.active:
|
||||||
|
del self.active[peer]
|
||||||
if peer == self.prev_closest_peer:
|
if peer == self.prev_closest_peer:
|
||||||
self.prev_closest_peer = None
|
self.prev_closest_peer = None
|
||||||
if peer == self.closest_peer:
|
if peer == self.closest_peer:
|
||||||
|
@ -216,10 +219,14 @@ class IterativeFinder:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
added = 0
|
added = 0
|
||||||
to_probe = list(self.active - self.contacted)
|
for index, peer in enumerate(self.active.keys()):
|
||||||
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
|
if index == 0:
|
||||||
log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
|
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
|
||||||
for peer in to_probe:
|
if self.closest_peer != peer:
|
||||||
|
self.prev_closest_peer = self.closest_peer
|
||||||
|
self.closest_peer = peer
|
||||||
|
if peer in self.contacted:
|
||||||
|
continue
|
||||||
if added >= constants.ALPHA:
|
if added >= constants.ALPHA:
|
||||||
break
|
break
|
||||||
origin_address = (peer.address, peer.udp_port)
|
origin_address = (peer.address, peer.udp_port)
|
||||||
|
@ -320,7 +327,7 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
return FindNodeResponse(self.key, response)
|
return FindNodeResponse(self.key, response)
|
||||||
|
|
||||||
def search_exhausted(self):
|
def search_exhausted(self):
|
||||||
self.put_result(self.active, finish=True)
|
self.put_result(self.active.keys(), finish=True)
|
||||||
|
|
||||||
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
|
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
|
||||||
not_yet_yielded = [
|
not_yet_yielded = [
|
||||||
|
@ -342,17 +349,17 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
log.debug("found")
|
log.debug("found")
|
||||||
return self.put_result(self.active, finish=True)
|
return self.put_result(self.active.keys(), finish=True)
|
||||||
elif self.is_closest_peer_ready:
|
elif self.is_closest_peer_ready:
|
||||||
self.bottom_out_count += 1
|
self.bottom_out_count += 1
|
||||||
else:
|
else:
|
||||||
self.bottom_out_count = 0
|
self.bottom_out_count = 0
|
||||||
|
|
||||||
if self.are_k_closest_peers_ready:
|
if self.are_k_closest_peers_ready:
|
||||||
self.put_result(self.active, True)
|
self.put_result(self.active.keys(), True)
|
||||||
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
||||||
log.debug("peer search bottomed out.")
|
log.debug("peer search bottomed out.")
|
||||||
self.put_result(self.active, True)
|
self.put_result(self.active.keys(), True)
|
||||||
|
|
||||||
|
|
||||||
class IterativeValueFinder(IterativeFinder):
|
class IterativeValueFinder(IterativeFinder):
|
||||||
|
|
Loading…
Reference in a new issue