lbry-sdk/lbry/dht/protocol/iterative_find.py

419 lines
18 KiB
Python
Raw Normal View History

2019-01-22 12:49:43 -05:00
import asyncio
2019-01-31 01:13:01 -03:00
from itertools import chain
from collections import defaultdict
2019-01-22 12:49:43 -05:00
import typing
import logging
2020-01-03 01:31:28 -03:00
from typing import TYPE_CHECKING
2019-06-20 20:55:47 -04:00
from lbry.dht import constants
from lbry.dht.error import RemoteException, TransportNotConnected
from lbry.dht.protocol.distance import Distance
2019-09-30 21:00:10 -03:00
from lbry.dht.peer import make_kademlia_peer
from lbry.dht.serialization.datagram import PAGE_KEY
2019-01-22 12:49:43 -05:00
if TYPE_CHECKING:
2019-06-20 20:55:47 -04:00
from lbry.dht.protocol.routing_table import TreeRoutingTable
from lbry.dht.protocol.protocol import KademliaProtocol
from lbry.dht.peer import PeerManager, KademliaPeer
2019-01-22 12:49:43 -05:00
log = logging.getLogger(__name__)
class FindResponse:
@property
def found(self) -> bool:
raise NotImplementedError()
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
raise NotImplementedError()
class FindNodeResponse(FindResponse):
def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
self.key = key
self.close_triples = close_triples
@property
def found(self) -> bool:
return self.key in [triple[0] for triple in self.close_triples]
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
return self.close_triples
class FindValueResponse(FindResponse):
def __init__(self, key: bytes, result_dict: typing.Dict):
self.key = key
self.token = result_dict[b'token']
self.close_triples: typing.List[typing.Tuple[bytes, bytes, int]] = result_dict.get(b'contacts', [])
self.found_compact_addresses = result_dict.get(key, [])
self.pages = int(result_dict.get(PAGE_KEY, 0))
2019-01-22 12:49:43 -05:00
@property
def found(self) -> bool:
return len(self.found_compact_addresses) > 0
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
return [(node_id, address.decode(), port) for node_id, address, port in self.close_triples]
def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
shortlist: typing.Optional[typing.List['KademliaPeer']]) -> typing.List['KademliaPeer']:
"""
If not provided, initialize the shortlist of peers to probe to the (up to) k closest peers in the routing table
:param routing_table: a TreeRoutingTable
:param key: a 48 byte hash
:param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no
peers in the routing table. During bootstrap the shortlist is set to be the seed nodes.
"""
2020-01-03 00:57:28 -03:00
if len(key) != constants.HASH_LENGTH:
2019-01-22 12:49:43 -05:00
raise ValueError("invalid key length: %i" % len(key))
return shortlist or routing_table.find_close_peers(key)
2019-01-22 12:49:43 -05:00
class IterativeFinder:
2019-08-02 13:14:41 -04:00
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
2019-01-22 12:49:43 -05:00
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
2020-01-03 00:57:28 -03:00
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
2019-01-22 12:49:43 -05:00
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
2020-01-03 00:57:28 -03:00
if len(key) != constants.HASH_LENGTH:
2019-01-22 12:49:43 -05:00
raise ValueError("invalid key length: %i" % len(key))
self.loop = loop
self.peer_manager = peer_manager
self.routing_table = routing_table
self.protocol = protocol
self.key = key
self.bottom_out_limit = bottom_out_limit
self.max_results = max_results
self.exclude = exclude or []
2019-05-13 04:34:39 -03:00
self.active: typing.Set['KademliaPeer'] = set()
self.contacted: typing.Set['KademliaPeer'] = set()
2019-01-22 12:49:43 -05:00
self.distance = Distance(key)
self.closest_peer: typing.Optional['KademliaPeer'] = None
2019-01-22 12:49:43 -05:00
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
self.iteration_queue = asyncio.Queue(loop=self.loop)
2019-01-31 01:13:01 -03:00
self.running_probes: typing.Set[asyncio.Task] = set()
2019-01-22 12:49:43 -05:00
self.iteration_count = 0
self.bottom_out_count = 0
self.running = False
self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
2019-05-13 04:34:39 -03:00
for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id:
self._add_active(peer, force=True)
2019-05-13 04:34:39 -03:00
else:
# seed nodes
self._schedule_probe(peer)
2019-01-22 12:49:43 -05:00
@property
def is_closest_peer_ready(self):
if not self.closest_peer or not self.prev_closest_peer:
return False
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False
2019-01-22 12:49:43 -05:00
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
"""
Send the rpc request to the peer and return an object with the FindResponse interface
"""
raise NotImplementedError()
2019-01-31 01:13:01 -03:00
def search_exhausted(self):
"""
This method ends the iterator due no more peers to contact.
Override to provide last time results.
"""
self.iteration_queue.put_nowait(None)
2019-01-22 12:49:43 -05:00
def check_result_ready(self, response: FindResponse):
"""
2019-01-31 01:13:01 -03:00
Called after adding peers from an rpc result to the shortlist.
2019-01-22 12:49:43 -05:00
This method is responsible for putting a result for the generator into the Queue
"""
raise NotImplementedError()
2020-01-03 01:31:28 -03:00
def get_initial_result(self) -> typing.List['KademliaPeer']: #pylint: disable=no-self-use
2019-01-22 12:49:43 -05:00
"""
Get an initial or cached result to be put into the Queue. Used for findValue requests where the blob
has peers in the local data store of blobs announced to us
"""
return []
def _is_closer(self, peer: 'KademliaPeer') -> bool:
2019-01-31 01:13:01 -03:00
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
2019-01-22 12:49:43 -05:00
def _add_active(self, peer, force=False):
if not force and self.peer_manager.peer_is_good(peer) is False:
2022-01-25 17:00:37 -03:00
return
if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
log.debug("[%s] closest peer went bad", self.key.hex()[:8])
if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
log.debug("[%s] previous closest was bad too", self.key.hex()[:8])
self.closest_peer = self.prev_closest_peer
else:
self.closest_peer = None
self.prev_closest_peer = None
2019-05-13 04:34:39 -03:00
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
2019-01-22 12:49:43 -05:00
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
2019-05-13 04:34:39 -03:00
self._add_active(peer)
2019-01-31 01:13:01 -03:00
for contact_triple in response.get_close_triples():
node_id, address, udp_port = contact_triple
2019-11-29 15:28:41 -05:00
try:
self._add_active(make_kademlia_peer(node_id, address, udp_port))
except ValueError:
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
peer.udp_port, address, udp_port)
2019-01-31 01:13:01 -03:00
self.check_result_ready(response)
self._log_state()
2019-01-22 12:49:43 -05:00
2022-02-07 21:46:43 -03:00
def _reset_closest(self, peer):
if peer == self.prev_closest_peer:
self.prev_closest_peer = None
if peer == self.closest_peer:
self.closest_peer = self.prev_closest_peer
2019-01-22 12:49:43 -05:00
async def _send_probe(self, peer: 'KademliaPeer'):
try:
response = await self.send_probe(peer)
except asyncio.TimeoutError:
2022-02-07 21:46:43 -03:00
self._reset_closest(peer)
2019-05-13 04:34:39 -03:00
self.active.discard(peer)
2019-01-22 12:49:43 -05:00
return
except ValueError as err:
log.warning(str(err))
2022-02-07 21:46:43 -03:00
self._reset_closest(peer)
2019-05-13 04:34:39 -03:00
self.active.discard(peer)
2019-01-22 12:49:43 -05:00
return
2019-05-08 23:00:57 -03:00
except TransportNotConnected:
return self.aclose()
except RemoteException:
2019-01-22 12:49:43 -05:00
return
return await self._handle_probe_result(peer, response)
async def _search_round(self):
"""
2019-05-13 04:34:39 -03:00
Send up to constants.alpha (5) probes to closest active peers
2019-01-22 12:49:43 -05:00
"""
added = 0
2019-05-13 04:34:39 -03:00
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
2019-05-13 04:34:39 -03:00
for peer in to_probe:
2020-01-03 00:57:28 -03:00
if added >= constants.ALPHA:
break
2019-01-31 01:13:01 -03:00
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
2019-01-31 01:13:01 -03:00
continue
if peer.node_id == self.protocol.node_id:
continue
if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
2019-01-31 01:13:01 -03:00
continue
2019-05-13 04:34:39 -03:00
self._schedule_probe(peer)
added += 1
log.debug("running %d probes for key %s", len(self.running_probes), self.key.hex()[:8])
2019-01-31 01:13:01 -03:00
if not added and not self.running_probes:
log.debug("search for %s exhausted", self.key.hex()[:8])
2019-01-31 01:13:01 -03:00
self.search_exhausted()
2019-01-22 12:49:43 -05:00
2019-05-13 04:34:39 -03:00
def _schedule_probe(self, peer: 'KademliaPeer'):
self.contacted.add(peer)
t = self.loop.create_task(self._send_probe(peer))
def callback(_):
self.running_probes.difference_update({
probe for probe in self.running_probes if probe.done() or probe == t
})
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
t.add_done_callback(callback)
self.running_probes.add(t)
2020-01-03 00:57:28 -03:00
async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
2019-01-22 12:49:43 -05:00
try:
if self.running:
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
2019-04-10 10:26:57 -04:00
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
2019-01-22 12:49:43 -05:00
if self.running:
2019-01-31 01:13:01 -03:00
self.loop.call_soon(self.aclose)
2019-01-22 12:49:43 -05:00
def _log_state(self):
log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
if self.closest_peer and self.prev_closest_peer:
log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s",
self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted,
self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8])
2019-01-22 12:49:43 -05:00
def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
2019-01-31 01:13:01 -03:00
def __aiter__(self):
2019-01-22 12:49:43 -05:00
if self.running:
raise Exception("already running")
self.running = True
self._search()
return self
async def __anext__(self) -> typing.List['KademliaPeer']:
try:
if self.iteration_count == 0:
2019-01-31 01:13:01 -03:00
result = self.get_initial_result() or await self.iteration_queue.get()
else:
result = await self.iteration_queue.get()
if not result:
raise StopAsyncIteration
2019-01-22 12:49:43 -05:00
self.iteration_count += 1
return result
except (asyncio.CancelledError, StopAsyncIteration):
2019-01-31 01:13:01 -03:00
self.loop.call_soon(self.aclose)
2019-01-22 12:49:43 -05:00
raise
def aclose(self):
self.running = False
2019-01-31 01:13:01 -03:00
self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
task.cancel()
self.tasks.clear()
self.running_probes.clear()
self.delayed_calls.clear()
2019-01-22 12:49:43 -05:00
class IterativeNodeFinder(IterativeFinder):
2019-08-02 13:14:41 -04:00
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
2019-01-22 12:49:43 -05:00
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
2020-01-03 00:57:28 -03:00
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
2019-01-22 12:49:43 -05:00
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set()
async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
log.debug("probe %s:%d (%s) for NODE %s",
peer.address, peer.udp_port, peer.node_id.hex()[:8] if peer.node_id else '', self.key.hex()[:8])
2019-01-22 12:49:43 -05:00
response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
return FindNodeResponse(self.key, response)
2019-01-31 01:13:01 -03:00
def search_exhausted(self):
self.put_result(self.active, finish=True)
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
peer for peer in from_iter
if peer not in self.yielded_peers
2020-01-03 01:31:28 -03:00
and peer.node_id != self.protocol.node_id
and self.peer_manager.peer_is_good(peer) is not False
]
2019-01-22 12:49:43 -05:00
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
2019-01-22 12:49:43 -05:00
if to_yield:
self.yielded_peers.update(to_yield)
2019-01-22 12:49:43 -05:00
self.iteration_queue.put_nowait(to_yield)
2019-01-31 01:13:01 -03:00
if finish:
self.iteration_queue.put_nowait(None)
2019-01-22 12:49:43 -05:00
def check_result_ready(self, response: FindNodeResponse):
found = response.found and self.key != self.protocol.node_id
if found:
2019-02-01 18:13:45 -05:00
log.debug("found")
return self.put_result(self.active, finish=True)
elif self.is_closest_peer_ready:
2019-01-22 12:49:43 -05:00
self.bottom_out_count += 1
else:
self.bottom_out_count = 0
2019-01-22 12:49:43 -05:00
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.debug("limit hit")
2019-01-31 01:13:01 -03:00
self.put_result(self.active, True)
2019-01-22 12:49:43 -05:00
class IterativeValueFinder(IterativeFinder):
2019-08-02 13:14:41 -04:00
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
2019-01-22 12:49:43 -05:00
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
2020-01-03 00:57:28 -03:00
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
2019-01-22 12:49:43 -05:00
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
# this tracks the set of blob peers returned by each peer
self.discovered_peers: typing.Dict['KademliaPeer', typing.Set['KademliaPeer']] = defaultdict(set)
2019-01-22 12:49:43 -05:00
async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse:
log.debug("probe %s:%d (%s) for VALUE %s",
peer.address, peer.udp_port, peer.node_id.hex()[:8], self.key.hex()[:8])
page = self.peer_pages[peer]
response = await self.protocol.get_rpc_peer(peer).find_value(self.key, page=page)
parsed = FindValueResponse(self.key, response)
if not parsed.found:
return parsed
already_known = len(self.discovered_peers[peer])
2019-11-29 15:28:41 -05:00
decoded_peers = set()
for compact_addr in parsed.found_compact_addresses:
try:
decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr))
except ValueError:
log.warning("misbehaving peer %s:%i returned invalid peer for blob",
peer.address, peer.udp_port)
self.peer_manager.report_failure(peer.address, peer.udp_port)
parsed.found_compact_addresses.clear()
return parsed
self.discovered_peers[peer].update(decoded_peers)
log.debug("probed %s:%i page %i, %i known", peer.address, peer.udp_port, page,
already_known + len(parsed.found_compact_addresses))
if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses):
log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port)
2020-01-03 00:57:28 -03:00
elif len(parsed.found_compact_addresses) >= constants.K and self.peer_pages[peer] < parsed.pages:
# the peer returned a full page and indicates it has more
self.peer_pages[peer] += 1
if peer in self.contacted:
# the peer must be removed from self.contacted so that it will be probed for the next page
self.contacted.remove(peer)
return parsed
2019-01-22 12:49:43 -05:00
def check_result_ready(self, response: FindValueResponse):
if response.found:
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses]
to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers:
if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer)
to_yield.append(blob_peer)
if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.is_closest_peer_ready:
2019-01-22 12:49:43 -05:00
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
2019-01-31 01:13:01 -03:00
self.iteration_queue.put_nowait(None)
2019-01-22 12:49:43 -05:00
def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key):
return self.protocol.data_store.get_peers_for_blob(self.key)
return []