reorganizing iterative find code
This commit is contained in:
parent
18af2dcd4e
commit
fb457c820a
1 changed files with 43 additions and 42 deletions
|
@ -87,9 +87,8 @@ class IterativeFinder:
|
||||||
self.max_results = max_results
|
self.max_results = max_results
|
||||||
self.exclude = exclude or []
|
self.exclude = exclude or []
|
||||||
|
|
||||||
self.shortlist: typing.List['KademliaPeer'] = get_shortlist(routing_table, key, shortlist)
|
self.active: typing.Set['KademliaPeer'] = set()
|
||||||
self.active: typing.List['KademliaPeer'] = []
|
self.contacted: typing.Set['KademliaPeer'] = set()
|
||||||
self.contacted: typing.Set[typing.Tuple[str, int]] = set()
|
|
||||||
self.distance = Distance(key)
|
self.distance = Distance(key)
|
||||||
|
|
||||||
self.closest_peer: typing.Optional['KademliaPeer'] = None
|
self.closest_peer: typing.Optional['KademliaPeer'] = None
|
||||||
|
@ -103,6 +102,12 @@ class IterativeFinder:
|
||||||
self.running = False
|
self.running = False
|
||||||
self.tasks: typing.List[asyncio.Task] = []
|
self.tasks: typing.List[asyncio.Task] = []
|
||||||
self.delayed_calls: typing.List[asyncio.Handle] = []
|
self.delayed_calls: typing.List[asyncio.Handle] = []
|
||||||
|
for peer in get_shortlist(routing_table, key, shortlist):
|
||||||
|
if peer.node_id:
|
||||||
|
self._add_active(peer)
|
||||||
|
else:
|
||||||
|
# seed nodes
|
||||||
|
self._schedule_probe(peer)
|
||||||
|
|
||||||
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
|
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
|
||||||
"""
|
"""
|
||||||
|
@ -134,36 +139,32 @@ class IterativeFinder:
|
||||||
def _is_closer(self, peer: 'KademliaPeer') -> bool:
|
def _is_closer(self, peer: 'KademliaPeer') -> bool:
|
||||||
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
|
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
|
||||||
|
|
||||||
def _update_closest(self):
|
def _add_active(self, peer):
|
||||||
self.active.sort(key=lambda peer: self.distance(peer.node_id))
|
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
|
||||||
if self.closest_peer and self.closest_peer is not self.active[0]:
|
if self.peer_manager.peer_is_good(peer) is not False:
|
||||||
if self._is_closer(self.active[0]):
|
self.active.add(peer)
|
||||||
|
if self._is_closer(peer):
|
||||||
self.prev_closest_peer = self.closest_peer
|
self.prev_closest_peer = self.closest_peer
|
||||||
self.closest_peer = self.active[0]
|
self.closest_peer = peer
|
||||||
|
else:
|
||||||
|
self.protocol.remove_peer(peer)
|
||||||
|
|
||||||
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
||||||
if peer not in self.active and peer.node_id:
|
self._add_active(peer)
|
||||||
self.active.append(peer)
|
|
||||||
for contact_triple in response.get_close_triples():
|
for contact_triple in response.get_close_triples():
|
||||||
node_id, address, udp_port = contact_triple
|
node_id, address, udp_port = contact_triple
|
||||||
if (address, udp_port) not in self.contacted: # and not self.peer_manager.is_ignored(addr_tuple)
|
self._add_active(self.peer_manager.get_kademlia_peer(node_id, address, udp_port))
|
||||||
found_peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port)
|
|
||||||
if found_peer not in self.active and self.peer_manager.peer_is_good(found_peer) is not False:
|
|
||||||
self.active.append(found_peer)
|
|
||||||
self._update_closest()
|
|
||||||
self.check_result_ready(response)
|
self.check_result_ready(response)
|
||||||
|
|
||||||
async def _send_probe(self, peer: 'KademliaPeer'):
|
async def _send_probe(self, peer: 'KademliaPeer'):
|
||||||
try:
|
try:
|
||||||
response = await self.send_probe(peer)
|
response = await self.send_probe(peer)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
if peer in self.active:
|
self.active.discard(peer)
|
||||||
self.active.remove(peer)
|
|
||||||
return
|
return
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
log.warning(str(err))
|
log.warning(str(err))
|
||||||
if peer in self.active:
|
self.active.discard(peer)
|
||||||
self.active.remove(peer)
|
|
||||||
return
|
return
|
||||||
except TransportNotConnected:
|
except TransportNotConnected:
|
||||||
return self.aclose()
|
return self.aclose()
|
||||||
|
@ -173,25 +174,31 @@ class IterativeFinder:
|
||||||
|
|
||||||
async def _search_round(self):
|
async def _search_round(self):
|
||||||
"""
|
"""
|
||||||
Send up to constants.alpha (5) probes to the closest peers in the shortlist
|
Send up to constants.alpha (5) probes to closest active peers
|
||||||
"""
|
"""
|
||||||
|
|
||||||
added = 0
|
added = 0
|
||||||
for peer in chain(self.active, self.shortlist):
|
to_probe = list(self.active - self.contacted)
|
||||||
|
to_probe.sort(key=lambda peer: self.distance(self.key))
|
||||||
|
for peer in to_probe:
|
||||||
if added >= constants.alpha:
|
if added >= constants.alpha:
|
||||||
break
|
break
|
||||||
origin_address = (peer.address, peer.udp_port)
|
origin_address = (peer.address, peer.udp_port)
|
||||||
if self.peer_manager.peer_is_good(peer) is False:
|
|
||||||
self.protocol.remove_peer(peer)
|
|
||||||
continue
|
|
||||||
if origin_address in self.exclude:
|
if origin_address in self.exclude:
|
||||||
continue
|
continue
|
||||||
if peer.node_id == self.protocol.node_id:
|
if peer.node_id == self.protocol.node_id:
|
||||||
continue
|
continue
|
||||||
if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
|
if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
|
||||||
continue
|
continue
|
||||||
if origin_address not in self.contacted:
|
self._schedule_probe(peer)
|
||||||
self.contacted.add(origin_address)
|
added += 1
|
||||||
|
log.debug("running %d probes", len(self.running_probes))
|
||||||
|
if not added and not self.running_probes:
|
||||||
|
log.debug("search for %s exhausted", hexlify(self.key)[:8])
|
||||||
|
self.search_exhausted()
|
||||||
|
|
||||||
|
def _schedule_probe(self, peer: 'KademliaPeer'):
|
||||||
|
self.contacted.add(peer)
|
||||||
|
|
||||||
t = self.loop.create_task(self._send_probe(peer))
|
t = self.loop.create_task(self._send_probe(peer))
|
||||||
|
|
||||||
|
@ -204,11 +211,6 @@ class IterativeFinder:
|
||||||
|
|
||||||
t.add_done_callback(callback)
|
t.add_done_callback(callback)
|
||||||
self.running_probes.add(t)
|
self.running_probes.add(t)
|
||||||
added += 1
|
|
||||||
log.debug("running %d probes", len(self.running_probes))
|
|
||||||
if not added and not self.running_probes:
|
|
||||||
log.debug("search for %s exhausted", hexlify(self.key)[:8])
|
|
||||||
self.search_exhausted()
|
|
||||||
|
|
||||||
async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay):
|
async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay):
|
||||||
try:
|
try:
|
||||||
|
@ -296,8 +298,7 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
self.bottom_out_count = 0
|
self.bottom_out_count = 0
|
||||||
elif self.prev_closest_peer and self.closest_peer:
|
elif self.prev_closest_peer and self.closest_peer:
|
||||||
self.bottom_out_count += 1
|
self.bottom_out_count += 1
|
||||||
log.info("bottom out %i %i %i %i", len(self.active), len(self.contacted), len(self.shortlist),
|
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
|
||||||
self.bottom_out_count)
|
|
||||||
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
||||||
log.info("limit hit")
|
log.info("limit hit")
|
||||||
self.put_result(self.active, True)
|
self.put_result(self.active, True)
|
||||||
|
|
Loading…
Add table
Reference in a new issue