2019-01-22 18:49:43 +01:00
|
|
|
import asyncio
|
2019-01-31 05:13:01 +01:00
|
|
|
from itertools import chain
|
2022-02-19 06:32:12 +01:00
|
|
|
from collections import defaultdict, OrderedDict
|
2022-04-12 00:17:16 +02:00
|
|
|
from collections.abc import AsyncGenerator
|
2019-01-22 18:49:43 +01:00
|
|
|
import typing
|
|
|
|
import logging
|
2020-01-03 05:31:28 +01:00
|
|
|
from typing import TYPE_CHECKING
|
2019-06-21 02:55:47 +02:00
|
|
|
from lbry.dht import constants
|
|
|
|
from lbry.dht.error import RemoteException, TransportNotConnected
|
|
|
|
from lbry.dht.protocol.distance import Distance
|
2019-10-01 02:00:10 +02:00
|
|
|
from lbry.dht.peer import make_kademlia_peer
|
2019-11-26 20:48:30 +01:00
|
|
|
from lbry.dht.serialization.datagram import PAGE_KEY
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
2019-06-21 02:55:47 +02:00
|
|
|
from lbry.dht.protocol.routing_table import TreeRoutingTable
|
|
|
|
from lbry.dht.protocol.protocol import KademliaProtocol
|
|
|
|
from lbry.dht.peer import PeerManager, KademliaPeer
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class FindResponse:
|
|
|
|
@property
|
|
|
|
def found(self) -> bool:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
|
|
class FindNodeResponse(FindResponse):
|
|
|
|
def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
|
|
|
|
self.key = key
|
|
|
|
self.close_triples = close_triples
|
|
|
|
|
|
|
|
@property
|
|
|
|
def found(self) -> bool:
|
|
|
|
return self.key in [triple[0] for triple in self.close_triples]
|
|
|
|
|
|
|
|
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
|
|
|
|
return self.close_triples
|
|
|
|
|
|
|
|
|
|
|
|
class FindValueResponse(FindResponse):
|
|
|
|
def __init__(self, key: bytes, result_dict: typing.Dict):
|
|
|
|
self.key = key
|
|
|
|
self.token = result_dict[b'token']
|
|
|
|
self.close_triples: typing.List[typing.Tuple[bytes, bytes, int]] = result_dict.get(b'contacts', [])
|
|
|
|
self.found_compact_addresses = result_dict.get(key, [])
|
2019-11-26 20:48:30 +01:00
|
|
|
self.pages = int(result_dict.get(PAGE_KEY, 0))
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
@property
|
|
|
|
def found(self) -> bool:
|
|
|
|
return len(self.found_compact_addresses) > 0
|
|
|
|
|
|
|
|
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
|
|
|
|
return [(node_id, address.decode(), port) for node_id, address, port in self.close_triples]
|
|
|
|
|
|
|
|
|
|
|
|
def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
|
|
|
|
shortlist: typing.Optional[typing.List['KademliaPeer']]) -> typing.List['KademliaPeer']:
|
|
|
|
"""
|
|
|
|
If not provided, initialize the shortlist of peers to probe to the (up to) k closest peers in the routing table
|
|
|
|
|
|
|
|
:param routing_table: a TreeRoutingTable
|
|
|
|
:param key: a 48 byte hash
|
|
|
|
:param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no
|
|
|
|
peers in the routing table. During bootstrap the shortlist is set to be the seed nodes.
|
|
|
|
"""
|
2020-01-03 04:57:28 +01:00
|
|
|
if len(key) != constants.HASH_LENGTH:
|
2019-01-22 18:49:43 +01:00
|
|
|
raise ValueError("invalid key length: %i" % len(key))
|
2019-05-11 09:58:50 +02:00
|
|
|
return shortlist or routing_table.find_close_peers(key)
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
|
2022-04-12 00:17:16 +02:00
|
|
|
class IterativeFinder(AsyncGenerator):
|
2019-08-02 19:14:41 +02:00
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
2019-01-22 18:49:43 +01:00
|
|
|
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
2022-02-11 23:45:08 +01:00
|
|
|
max_results: typing.Optional[int] = constants.K,
|
2019-01-22 18:49:43 +01:00
|
|
|
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
|
|
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
2020-01-03 04:57:28 +01:00
|
|
|
if len(key) != constants.HASH_LENGTH:
|
2019-01-22 18:49:43 +01:00
|
|
|
raise ValueError("invalid key length: %i" % len(key))
|
|
|
|
self.loop = loop
|
|
|
|
self.peer_manager = peer_manager
|
|
|
|
self.routing_table = routing_table
|
|
|
|
self.protocol = protocol
|
|
|
|
|
|
|
|
self.key = key
|
2022-02-08 23:58:28 +01:00
|
|
|
self.max_results = max(constants.K, max_results)
|
2019-01-22 18:49:43 +01:00
|
|
|
self.exclude = exclude or []
|
|
|
|
|
2022-02-19 06:32:12 +01:00
|
|
|
self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
|
2019-05-13 09:34:39 +02:00
|
|
|
self.contacted: typing.Set['KademliaPeer'] = set()
|
2019-01-22 18:49:43 +01:00
|
|
|
self.distance = Distance(key)
|
|
|
|
|
|
|
|
self.iteration_queue = asyncio.Queue(loop=self.loop)
|
|
|
|
|
2022-02-10 05:48:11 +01:00
|
|
|
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
|
2019-01-22 18:49:43 +01:00
|
|
|
self.iteration_count = 0
|
|
|
|
self.running = False
|
|
|
|
self.tasks: typing.List[asyncio.Task] = []
|
2022-04-12 00:17:16 +02:00
|
|
|
self.generator = None
|
|
|
|
|
2019-05-13 09:34:39 +02:00
|
|
|
for peer in get_shortlist(routing_table, key, shortlist):
|
|
|
|
if peer.node_id:
|
2022-02-07 18:54:57 +01:00
|
|
|
self._add_active(peer, force=True)
|
2019-05-13 09:34:39 +02:00
|
|
|
else:
|
|
|
|
# seed nodes
|
|
|
|
self._schedule_probe(peer)
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
|
|
|
|
"""
|
|
|
|
Send the rpc request to the peer and return an object with the FindResponse interface
|
|
|
|
"""
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
2019-01-31 05:13:01 +01:00
|
|
|
def search_exhausted(self):
|
|
|
|
"""
|
|
|
|
This method ends the iterator due no more peers to contact.
|
|
|
|
Override to provide last time results.
|
|
|
|
"""
|
|
|
|
self.iteration_queue.put_nowait(None)
|
|
|
|
|
2019-01-22 18:49:43 +01:00
|
|
|
def check_result_ready(self, response: FindResponse):
|
|
|
|
"""
|
2019-01-31 05:13:01 +01:00
|
|
|
Called after adding peers from an rpc result to the shortlist.
|
2019-01-22 18:49:43 +01:00
|
|
|
This method is responsible for putting a result for the generator into the Queue
|
|
|
|
"""
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
2020-01-03 05:31:28 +01:00
|
|
|
def get_initial_result(self) -> typing.List['KademliaPeer']: #pylint: disable=no-self-use
|
2019-01-22 18:49:43 +01:00
|
|
|
"""
|
|
|
|
Get an initial or cached result to be put into the Queue. Used for findValue requests where the blob
|
|
|
|
has peers in the local data store of blobs announced to us
|
|
|
|
"""
|
|
|
|
return []
|
|
|
|
|
2022-02-07 18:54:57 +01:00
|
|
|
def _add_active(self, peer, force=False):
|
|
|
|
if not force and self.peer_manager.peer_is_good(peer) is False:
|
2022-01-25 21:00:37 +01:00
|
|
|
return
|
2022-02-08 03:13:58 +01:00
|
|
|
if peer in self.contacted:
|
|
|
|
return
|
2019-05-13 09:34:39 +02:00
|
|
|
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
|
2022-02-08 23:57:17 +01:00
|
|
|
self.active[peer] = self.distance(peer.node_id)
|
2022-02-19 06:32:12 +01:00
|
|
|
self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
2019-05-13 09:34:39 +02:00
|
|
|
self._add_active(peer)
|
2019-01-31 05:13:01 +01:00
|
|
|
for contact_triple in response.get_close_triples():
|
|
|
|
node_id, address, udp_port = contact_triple
|
2019-11-29 21:28:41 +01:00
|
|
|
try:
|
|
|
|
self._add_active(make_kademlia_peer(node_id, address, udp_port))
|
|
|
|
except ValueError:
|
|
|
|
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
|
|
|
|
peer.udp_port, address, udp_port)
|
2019-01-31 05:13:01 +01:00
|
|
|
self.check_result_ready(response)
|
2022-02-04 19:38:15 +01:00
|
|
|
self._log_state()
|
2019-01-22 18:49:43 +01:00
|
|
|
|
2022-02-08 01:46:43 +01:00
|
|
|
def _reset_closest(self, peer):
|
2022-02-08 23:57:17 +01:00
|
|
|
if peer in self.active:
|
|
|
|
del self.active[peer]
|
2022-02-08 01:46:43 +01:00
|
|
|
|
2019-01-22 18:49:43 +01:00
|
|
|
async def _send_probe(self, peer: 'KademliaPeer'):
|
|
|
|
try:
|
|
|
|
response = await self.send_probe(peer)
|
|
|
|
except asyncio.TimeoutError:
|
2022-02-08 01:46:43 +01:00
|
|
|
self._reset_closest(peer)
|
2019-01-22 18:49:43 +01:00
|
|
|
return
|
2022-04-12 00:17:16 +02:00
|
|
|
except asyncio.CancelledError:
|
|
|
|
log.debug("%s[%x] cancelled probe",
|
|
|
|
type(self).__name__, id(self))
|
|
|
|
return
|
2019-01-22 18:49:43 +01:00
|
|
|
except ValueError as err:
|
|
|
|
log.warning(str(err))
|
2022-02-08 01:46:43 +01:00
|
|
|
self._reset_closest(peer)
|
2019-01-22 18:49:43 +01:00
|
|
|
return
|
2019-05-09 04:00:57 +02:00
|
|
|
except TransportNotConnected:
|
2022-04-12 18:32:16 +02:00
|
|
|
await self._aclose(reason="not connected")
|
|
|
|
return
|
2019-05-09 04:00:57 +02:00
|
|
|
except RemoteException:
|
2022-02-08 03:13:58 +01:00
|
|
|
self._reset_closest(peer)
|
2019-01-22 18:49:43 +01:00
|
|
|
return
|
|
|
|
return await self._handle_probe_result(peer, response)
|
|
|
|
|
2022-02-10 05:48:11 +01:00
|
|
|
def _search_round(self):
|
2019-01-22 18:49:43 +01:00
|
|
|
"""
|
2019-05-13 09:34:39 +02:00
|
|
|
Send up to constants.alpha (5) probes to closest active peers
|
2019-01-22 18:49:43 +01:00
|
|
|
"""
|
|
|
|
|
|
|
|
added = 0
|
2022-02-08 23:57:17 +01:00
|
|
|
for index, peer in enumerate(self.active.keys()):
|
|
|
|
if index == 0:
|
2022-04-12 00:17:16 +02:00
|
|
|
log.debug("%s[%x] closest to probe: %s",
|
|
|
|
type(self).__name__, id(self),
|
|
|
|
peer.node_id.hex()[:8])
|
2022-02-08 23:57:17 +01:00
|
|
|
if peer in self.contacted:
|
|
|
|
continue
|
2022-02-10 05:48:11 +01:00
|
|
|
if len(self.running_probes) >= constants.ALPHA:
|
2019-05-11 09:58:50 +02:00
|
|
|
break
|
2022-02-12 07:28:26 +01:00
|
|
|
if index > (constants.K + len(self.running_probes)):
|
2022-02-08 23:58:28 +01:00
|
|
|
break
|
2022-04-12 00:17:16 +02:00
|
|
|
if self.iteration_count + self.iteration_queue.qsize() >= self.max_results:
|
|
|
|
break
|
2019-01-31 05:13:01 +01:00
|
|
|
origin_address = (peer.address, peer.udp_port)
|
2019-05-13 07:40:04 +02:00
|
|
|
if origin_address in self.exclude:
|
2019-01-31 05:13:01 +01:00
|
|
|
continue
|
|
|
|
if peer.node_id == self.protocol.node_id:
|
|
|
|
continue
|
2019-05-11 09:58:50 +02:00
|
|
|
if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
|
2019-01-31 05:13:01 +01:00
|
|
|
continue
|
2019-05-13 09:34:39 +02:00
|
|
|
self._schedule_probe(peer)
|
|
|
|
added += 1
|
2022-04-12 00:17:16 +02:00
|
|
|
log.debug("%s[%x] running %d probes for key %s",
|
|
|
|
type(self).__name__, id(self),
|
|
|
|
len(self.running_probes), self.key.hex()[:8])
|
2019-01-31 05:13:01 +01:00
|
|
|
if not added and not self.running_probes:
|
2022-04-12 00:17:16 +02:00
|
|
|
log.debug("%s[%x] search for %s exhausted",
|
|
|
|
type(self).__name__, id(self),
|
|
|
|
self.key.hex()[:8])
|
2019-01-31 05:13:01 +01:00
|
|
|
self.search_exhausted()
|
2019-01-22 18:49:43 +01:00
|
|
|
|
2019-05-13 09:34:39 +02:00
|
|
|
def _schedule_probe(self, peer: 'KademliaPeer'):
|
|
|
|
self.contacted.add(peer)
|
|
|
|
|
|
|
|
t = self.loop.create_task(self._send_probe(peer))
|
|
|
|
|
|
|
|
def callback(_):
|
2022-02-19 06:30:31 +01:00
|
|
|
self.running_probes.pop(peer, None)
|
|
|
|
if self.running:
|
|
|
|
self._search_round()
|
2019-05-13 09:34:39 +02:00
|
|
|
|
|
|
|
t.add_done_callback(callback)
|
2022-02-10 05:48:11 +01:00
|
|
|
self.running_probes[peer] = t
|
2019-05-13 09:34:39 +02:00
|
|
|
|
2022-02-04 19:38:15 +01:00
|
|
|
def _log_state(self):
|
2022-04-12 00:17:16 +02:00
|
|
|
log.debug("%s[%x] [%s] check result: %i active nodes %i contacted %i produced %i queued",
|
|
|
|
type(self).__name__, id(self), self.key.hex()[:8],
|
|
|
|
len(self.active), len(self.contacted),
|
|
|
|
self.iteration_count, self.iteration_queue.qsize())
|
|
|
|
|
|
|
|
async def _generator_func(self):
|
|
|
|
try:
|
|
|
|
while self.iteration_count < self.max_results:
|
|
|
|
if self.iteration_count == 0:
|
|
|
|
result = self.get_initial_result() or await self.iteration_queue.get()
|
|
|
|
else:
|
|
|
|
result = await self.iteration_queue.get()
|
|
|
|
if not result:
|
|
|
|
# no more results
|
|
|
|
await self._aclose(reason="no more results")
|
|
|
|
self.generator = None
|
|
|
|
return
|
|
|
|
self.iteration_count += 1
|
|
|
|
yield result
|
|
|
|
# reached max_results limit
|
|
|
|
await self._aclose(reason="max_results reached")
|
|
|
|
self.generator = None
|
|
|
|
return
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
await self._aclose(reason="cancelled")
|
|
|
|
self.generator = None
|
|
|
|
raise
|
|
|
|
except GeneratorExit:
|
|
|
|
await self._aclose(reason="generator exit")
|
|
|
|
self.generator = None
|
|
|
|
raise
|
2022-02-04 19:38:15 +01:00
|
|
|
|
2019-01-31 05:13:01 +01:00
|
|
|
def __aiter__(self):
|
2019-01-22 18:49:43 +01:00
|
|
|
if self.running:
|
|
|
|
raise Exception("already running")
|
|
|
|
self.running = True
|
2022-04-12 00:17:16 +02:00
|
|
|
self.generator = self._generator_func()
|
2022-02-19 06:30:31 +01:00
|
|
|
self.loop.call_soon(self._search_round)
|
2022-04-12 00:17:16 +02:00
|
|
|
return super().__aiter__()
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
async def __anext__(self) -> typing.List['KademliaPeer']:
|
2022-04-12 00:17:16 +02:00
|
|
|
return await super().__anext__()
|
|
|
|
|
2022-04-26 20:05:58 +02:00
|
|
|
async def asend(self, value):
|
|
|
|
return await self.generator.asend(value)
|
2019-01-22 18:49:43 +01:00
|
|
|
|
2022-04-12 00:17:16 +02:00
|
|
|
async def athrow(self, typ, val=None, tb=None):
|
|
|
|
return await self.generator.athrow(typ, val, tb)
|
|
|
|
|
|
|
|
async def _aclose(self, reason="?"):
|
2019-01-22 18:49:43 +01:00
|
|
|
self.running = False
|
2022-04-12 00:17:16 +02:00
|
|
|
running_tasks = list(chain(self.tasks, self.running_probes.values()))
|
|
|
|
for task in running_tasks:
|
2019-01-31 05:13:01 +01:00
|
|
|
task.cancel()
|
2022-04-12 00:17:16 +02:00
|
|
|
log.debug("%s[%x] [%s] async close because %s: %i active nodes %i contacted %i produced %i queued",
|
|
|
|
type(self).__name__, id(self), self.key.hex()[:8],
|
|
|
|
reason, len(self.active), len(self.contacted),
|
|
|
|
self.iteration_count, self.iteration_queue.qsize())
|
2019-01-31 05:13:01 +01:00
|
|
|
self.tasks.clear()
|
|
|
|
self.running_probes.clear()
|
2019-01-22 18:49:43 +01:00
|
|
|
|
2022-04-12 00:17:16 +02:00
|
|
|
async def aclose(self):
|
|
|
|
if self.generator:
|
|
|
|
await super().aclose()
|
|
|
|
self.generator = None
|
|
|
|
log.debug("%s[%x] [%s] async close completed",
|
|
|
|
type(self).__name__, id(self), self.key.hex()[:8])
|
|
|
|
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
class IterativeNodeFinder(IterativeFinder):
|
2019-08-02 19:14:41 +02:00
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
2019-01-22 18:49:43 +01:00
|
|
|
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
2022-02-11 23:45:08 +01:00
|
|
|
max_results: typing.Optional[int] = constants.K,
|
2019-01-22 18:49:43 +01:00
|
|
|
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
|
|
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
2022-02-11 23:45:08 +01:00
|
|
|
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
2019-01-22 18:49:43 +01:00
|
|
|
shortlist)
|
|
|
|
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
|
|
|
|
|
|
|
async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
|
2021-12-04 05:24:33 +01:00
|
|
|
log.debug("probe %s:%d (%s) for NODE %s",
|
|
|
|
peer.address, peer.udp_port, peer.node_id.hex()[:8] if peer.node_id else '', self.key.hex()[:8])
|
2019-01-22 18:49:43 +01:00
|
|
|
response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
|
|
|
|
return FindNodeResponse(self.key, response)
|
|
|
|
|
2019-01-31 05:13:01 +01:00
|
|
|
def search_exhausted(self):
|
2022-02-08 23:57:17 +01:00
|
|
|
self.put_result(self.active.keys(), finish=True)
|
2019-01-31 05:13:01 +01:00
|
|
|
|
|
|
|
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
|
2019-05-11 09:58:50 +02:00
|
|
|
not_yet_yielded = [
|
2019-05-13 19:46:06 +02:00
|
|
|
peer for peer in from_iter
|
|
|
|
if peer not in self.yielded_peers
|
2020-01-03 05:31:28 +01:00
|
|
|
and peer.node_id != self.protocol.node_id
|
2022-02-08 01:47:10 +01:00
|
|
|
and self.peer_manager.peer_is_good(peer) is True # return only peers who answered
|
2019-05-11 09:58:50 +02:00
|
|
|
]
|
2019-01-22 18:49:43 +01:00
|
|
|
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
|
2021-12-03 20:48:22 +01:00
|
|
|
to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
|
2019-01-22 18:49:43 +01:00
|
|
|
if to_yield:
|
2019-05-12 05:42:19 +02:00
|
|
|
self.yielded_peers.update(to_yield)
|
2019-01-22 18:49:43 +01:00
|
|
|
self.iteration_queue.put_nowait(to_yield)
|
2019-01-31 05:13:01 +01:00
|
|
|
if finish:
|
|
|
|
self.iteration_queue.put_nowait(None)
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
def check_result_ready(self, response: FindNodeResponse):
|
|
|
|
found = response.found and self.key != self.protocol.node_id
|
|
|
|
|
|
|
|
if found:
|
2019-02-02 00:13:45 +01:00
|
|
|
log.debug("found")
|
2022-02-08 23:57:17 +01:00
|
|
|
return self.put_result(self.active.keys(), finish=True)
|
2019-01-22 18:49:43 +01:00
|
|
|
|
2022-02-08 21:10:33 +01:00
|
|
|
|
2019-01-22 18:49:43 +01:00
|
|
|
class IterativeValueFinder(IterativeFinder):
|
2019-08-02 19:14:41 +02:00
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
2019-01-22 18:49:43 +01:00
|
|
|
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
2022-02-11 23:45:08 +01:00
|
|
|
max_results: typing.Optional[int] = constants.K,
|
2019-01-22 18:49:43 +01:00
|
|
|
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
|
|
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
2022-02-11 23:45:08 +01:00
|
|
|
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
2019-01-22 18:49:43 +01:00
|
|
|
shortlist)
|
|
|
|
self.blob_peers: typing.Set['KademliaPeer'] = set()
|
2019-11-26 20:48:30 +01:00
|
|
|
# this tracks the index of the most recent page we requested from each peer
|
|
|
|
self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
|
|
|
|
# this tracks the set of blob peers returned by each peer
|
|
|
|
self.discovered_peers: typing.Dict['KademliaPeer', typing.Set['KademliaPeer']] = defaultdict(set)
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse:
|
2021-12-04 05:24:33 +01:00
|
|
|
log.debug("probe %s:%d (%s) for VALUE %s",
|
|
|
|
peer.address, peer.udp_port, peer.node_id.hex()[:8], self.key.hex()[:8])
|
2019-11-26 20:48:30 +01:00
|
|
|
page = self.peer_pages[peer]
|
|
|
|
response = await self.protocol.get_rpc_peer(peer).find_value(self.key, page=page)
|
|
|
|
parsed = FindValueResponse(self.key, response)
|
|
|
|
if not parsed.found:
|
|
|
|
return parsed
|
|
|
|
already_known = len(self.discovered_peers[peer])
|
2019-11-29 21:28:41 +01:00
|
|
|
decoded_peers = set()
|
|
|
|
for compact_addr in parsed.found_compact_addresses:
|
|
|
|
try:
|
|
|
|
decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr))
|
|
|
|
except ValueError:
|
|
|
|
log.warning("misbehaving peer %s:%i returned invalid peer for blob",
|
|
|
|
peer.address, peer.udp_port)
|
|
|
|
self.peer_manager.report_failure(peer.address, peer.udp_port)
|
|
|
|
parsed.found_compact_addresses.clear()
|
|
|
|
return parsed
|
|
|
|
self.discovered_peers[peer].update(decoded_peers)
|
2019-11-26 20:48:30 +01:00
|
|
|
log.debug("probed %s:%i page %i, %i known", peer.address, peer.udp_port, page,
|
|
|
|
already_known + len(parsed.found_compact_addresses))
|
|
|
|
if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses):
|
|
|
|
log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port)
|
2020-01-03 04:57:28 +01:00
|
|
|
elif len(parsed.found_compact_addresses) >= constants.K and self.peer_pages[peer] < parsed.pages:
|
2019-11-26 20:48:30 +01:00
|
|
|
# the peer returned a full page and indicates it has more
|
|
|
|
self.peer_pages[peer] += 1
|
|
|
|
if peer in self.contacted:
|
|
|
|
# the peer must be removed from self.contacted so that it will be probed for the next page
|
|
|
|
self.contacted.remove(peer)
|
|
|
|
return parsed
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
def check_result_ready(self, response: FindValueResponse):
|
|
|
|
if response.found:
|
|
|
|
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
|
|
|
|
for compact_addr in response.found_compact_addresses]
|
|
|
|
to_yield = []
|
|
|
|
for blob_peer in blob_peers:
|
|
|
|
if blob_peer not in self.blob_peers:
|
|
|
|
self.blob_peers.add(blob_peer)
|
|
|
|
to_yield.append(blob_peer)
|
|
|
|
if to_yield:
|
|
|
|
self.iteration_queue.put_nowait(to_yield)
|
|
|
|
|
|
|
|
def get_initial_result(self) -> typing.List['KademliaPeer']:
|
|
|
|
if self.protocol.data_store.has_peers_for_blob(self.key):
|
|
|
|
return self.protocol.data_store.get_peers_for_blob(self.key)
|
|
|
|
return []
|