drain all pages of peers returned by find_value in IterativeValueFinder

This commit is contained in:
Jack Robison 2019-11-26 14:48:30 -05:00
parent ea639e64a4
commit 880aa265f1
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -1,13 +1,14 @@
import asyncio
from binascii import hexlify
from itertools import chain
from collections import defaultdict
import typing
import logging
from lbry.dht import constants
from lbry.dht.error import RemoteException, TransportNotConnected
from lbry.dht.protocol.distance import Distance
from lbry.dht.peer import make_kademlia_peer
from lbry.dht.serialization.datagram import PAGE_KEY
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@ -46,6 +47,7 @@ class FindValueResponse(FindResponse):
self.token = result_dict[b'token']
self.close_triples: typing.List[typing.Tuple[bytes, bytes, int]] = result_dict.get(b'contacts', [])
self.found_compact_addresses = result_dict.get(key, [])
self.pages = int(result_dict.get(PAGE_KEY, 0))
@property
def found(self) -> bool:
@ -314,10 +316,34 @@ class IterativeValueFinder(IterativeFinder):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
# this tracks the set of blob peers returned by each peer
self.discovered_peers: typing.Dict['KademliaPeer', typing.Set['KademliaPeer']] = defaultdict(set)
async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse:
response = await self.protocol.get_rpc_peer(peer).find_value(self.key)
return FindValueResponse(self.key, response)
page = self.peer_pages[peer]
response = await self.protocol.get_rpc_peer(peer).find_value(self.key, page=page)
parsed = FindValueResponse(self.key, response)
if not parsed.found:
return parsed
already_known = len(self.discovered_peers[peer])
self.discovered_peers[peer].update({
self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in parsed.found_compact_addresses
})
log.debug("probed %s:%i page %i, %i known", peer.address, peer.udp_port, page,
already_known + len(parsed.found_compact_addresses))
if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses):
log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port)
parsed.found_compact_addresses.clear()
elif len(parsed.found_compact_addresses) >= constants.k and self.peer_pages[peer] < parsed.pages:
# the peer returned a full page and indicates it has more
self.peer_pages[peer] += 1
if peer in self.contacted:
# the peer must be removed from self.contacted so that it will be probed for the next page
self.contacted.remove(peer)
return parsed
def check_result_ready(self, response: FindValueResponse):
if response.found: