diff --git a/lbry/lbry/dht/protocol/iterative_find.py b/lbry/lbry/dht/protocol/iterative_find.py index 748324ad4..11665f83c 100644 --- a/lbry/lbry/dht/protocol/iterative_find.py +++ b/lbry/lbry/dht/protocol/iterative_find.py @@ -1,13 +1,14 @@ import asyncio from binascii import hexlify from itertools import chain - +from collections import defaultdict import typing import logging from lbry.dht import constants from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.distance import Distance from lbry.dht.peer import make_kademlia_peer +from lbry.dht.serialization.datagram import PAGE_KEY from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -46,6 +47,7 @@ class FindValueResponse(FindResponse): self.token = result_dict[b'token'] self.close_triples: typing.List[typing.Tuple[bytes, bytes, int]] = result_dict.get(b'contacts', []) self.found_compact_addresses = result_dict.get(key, []) + self.pages = int(result_dict.get(PAGE_KEY, 0)) @property def found(self) -> bool: @@ -314,10 +316,34 @@ class IterativeValueFinder(IterativeFinder): super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, shortlist) self.blob_peers: typing.Set['KademliaPeer'] = set() + # this tracks the index of the most recent page we requested from each peer + self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int) + # this tracks the set of blob peers returned by each peer + self.discovered_peers: typing.Dict['KademliaPeer', typing.Set['KademliaPeer']] = defaultdict(set) async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse: - response = await self.protocol.get_rpc_peer(peer).find_value(self.key) - return FindValueResponse(self.key, response) + page = self.peer_pages[peer] + response = await self.protocol.get_rpc_peer(peer).find_value(self.key, page=page) + parsed = FindValueResponse(self.key, response) + if not parsed.found: + return parsed + already_known = len(self.discovered_peers[peer]) + self.discovered_peers[peer].update({ + self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) + for compact_addr in parsed.found_compact_addresses + }) + log.debug("probed %s:%i page %i, %i known", peer.address, peer.udp_port, page, + already_known + len(parsed.found_compact_addresses)) + if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses): + log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port) + parsed.found_compact_addresses.clear() + elif len(parsed.found_compact_addresses) >= constants.k and self.peer_pages[peer] < parsed.pages: + # the peer returned a full page and indicates it has more + self.peer_pages[peer] += 1 + if peer in self.contacted: + # the peer must be removed from self.contacted so that it will be probed for the next page + self.contacted.remove(peer) + return parsed def check_result_ready(self, response: FindValueResponse): if response.found: