Merge pull request #2647 from lbryio/drain-iterative-findvalue

Request all pages of peers for a blob
This commit is contained in:
Jack Robison 2019-12-01 15:52:28 -05:00 committed by GitHub
commit 2277d134cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,13 +1,14 @@
import asyncio
from binascii import hexlify
from itertools import chain
from collections import defaultdict
import typing
import logging
from lbry.dht import constants
from lbry.dht.error import RemoteException, TransportNotConnected
from lbry.dht.protocol.distance import Distance
from lbry.dht.peer import make_kademlia_peer
from lbry.dht.serialization.datagram import PAGE_KEY
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@ -46,6 +47,7 @@ class FindValueResponse(FindResponse):
self.token = result_dict[b'token']
self.close_triples: typing.List[typing.Tuple[bytes, bytes, int]] = result_dict.get(b'contacts', [])
self.found_compact_addresses = result_dict.get(key, [])
self.pages = int(result_dict.get(PAGE_KEY, 0))
@property
def found(self) -> bool:
@ -314,10 +316,34 @@ class IterativeValueFinder(IterativeFinder):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
# this tracks the set of blob peers returned by each peer
self.discovered_peers: typing.Dict['KademliaPeer', typing.Set['KademliaPeer']] = defaultdict(set)
async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse:
response = await self.protocol.get_rpc_peer(peer).find_value(self.key)
return FindValueResponse(self.key, response)
page = self.peer_pages[peer]
response = await self.protocol.get_rpc_peer(peer).find_value(self.key, page=page)
parsed = FindValueResponse(self.key, response)
if not parsed.found:
return parsed
already_known = len(self.discovered_peers[peer])
self.discovered_peers[peer].update({
self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in parsed.found_compact_addresses
})
log.debug("probed %s:%i page %i, %i known", peer.address, peer.udp_port, page,
already_known + len(parsed.found_compact_addresses))
if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses):
log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port)
parsed.found_compact_addresses.clear()
elif len(parsed.found_compact_addresses) >= constants.k and self.peer_pages[peer] < parsed.pages:
# the peer returned a full page and indicates it has more
self.peer_pages[peer] += 1
if peer in self.contacted:
# the peer must be removed from self.contacted so that it will be probed for the next page
self.contacted.remove(peer)
return parsed
def check_result_ready(self, response: FindValueResponse):
if response.found: