forked from LBRYCommunity/lbry-sdk
Merge remote-tracking branch 'origin/greedy_search'
This commit is contained in:
commit
acb95fdcdf
5 changed files with 34 additions and 15 deletions
|
@ -45,7 +45,7 @@ at anytime.
|
|||
* additional information to the balance error message when editing a claim (https://github.com/lbryio/lbry/pull/1309)
|
||||
* `address` and `port` arguments to `peer_ping` (https://github.com/lbryio/lbry/issues/1313)
|
||||
* ability to download from HTTP mirrors by setting `download_mirrors`
|
||||
*
|
||||
* ability to filter peers from an iterative find value operation (finding peers for a blob). This is used to filter peers we've already found for a blob when accumulating the list of peers.
|
||||
|
||||
### Removed
|
||||
* `session_status` argument and response field from `status`
|
||||
|
|
|
@ -294,7 +294,7 @@ ADJUSTABLE_SETTINGS = {
|
|||
'run_reflector_server': (bool, False), # adds `reflector` to components_to_skip unless True
|
||||
'sd_download_timeout': (int, 3),
|
||||
'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY
|
||||
'peer_search_timeout': (int, 30),
|
||||
'peer_search_timeout': (int, 60),
|
||||
'use_auth_http': (bool, False),
|
||||
'use_upnp': (bool, True),
|
||||
'use_keyring': (bool, False),
|
||||
|
|
|
@ -3,6 +3,7 @@ from twisted.internet import defer
|
|||
from distance import Distance
|
||||
from error import TimeoutError
|
||||
import constants
|
||||
import struct
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -14,10 +15,18 @@ def get_contact(contact_list, node_id, address, port):
|
|||
raise IndexError(node_id)
|
||||
|
||||
|
||||
def expand_peer(compact_peer_info):
|
||||
host = ".".join([str(ord(d)) for d in compact_peer_info[:4]])
|
||||
port, = struct.unpack('>H', compact_peer_info[4:6])
|
||||
peer_node_id = compact_peer_info[6:]
|
||||
return (peer_node_id, host, port)
|
||||
|
||||
|
||||
class _IterativeFind(object):
|
||||
# TODO: use polymorphism to search for a value or node
|
||||
# instead of using a find_value flag
|
||||
def __init__(self, node, shortlist, key, rpc):
|
||||
def __init__(self, node, shortlist, key, rpc, exclude=None):
|
||||
self.exclude = set(exclude or [])
|
||||
self.node = node
|
||||
self.finished_deferred = defer.Deferred()
|
||||
# all distance operations in this class only care about the distance
|
||||
|
@ -93,8 +102,12 @@ class _IterativeFind(object):
|
|||
# we are looking for before treating it as a list of contact triples
|
||||
if self.is_find_value_request and self.key in result:
|
||||
# We have found the value
|
||||
self.find_value_result[self.key] = result[self.key]
|
||||
self.finished_deferred.callback(self.find_value_result)
|
||||
for peer in result[self.key]:
|
||||
_, host, port = expand_peer(peer)
|
||||
if (host, port) not in self.exclude:
|
||||
self.find_value_result.setdefault(self.key, []).append(peer)
|
||||
if self.find_value_result:
|
||||
self.finished_deferred.callback(self.find_value_result)
|
||||
else:
|
||||
if self.is_find_value_request:
|
||||
# We are looking for a value, and the remote node didn't have it
|
||||
|
@ -191,8 +204,11 @@ class _IterativeFind(object):
|
|||
|
||||
elif not self.finished_deferred.called and not self.active_probes or self.should_stop():
|
||||
# If no probes were sent, there will not be any improvement, so we're done
|
||||
self.sortByDistance(self.active_contacts)
|
||||
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
||||
if self.is_find_value_request:
|
||||
self.finished_deferred.callback(self.find_value_result)
|
||||
else:
|
||||
self.sortByDistance(self.active_contacts)
|
||||
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
||||
elif not self.finished_deferred.called:
|
||||
# Force the next iteration
|
||||
self.searchIteration()
|
||||
|
@ -209,7 +225,7 @@ class _IterativeFind(object):
|
|||
self.pending_iteration_calls.append(cancel)
|
||||
|
||||
|
||||
def iterativeFind(node, shortlist, key, rpc):
|
||||
helper = _IterativeFind(node, shortlist, key, rpc)
|
||||
def iterativeFind(node, shortlist, key, rpc, exclude=None):
|
||||
helper = _IterativeFind(node, shortlist, key, rpc, exclude)
|
||||
helper.searchIteration(0)
|
||||
return helper.finished_deferred
|
||||
|
|
|
@ -379,7 +379,7 @@ class Node(MockKademliaHelper):
|
|||
return self._iterativeFind(key)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def iterativeFindValue(self, key):
|
||||
def iterativeFindValue(self, key, exclude=None):
|
||||
""" The Kademlia search operation (deterministic)
|
||||
|
||||
Call this to retrieve data from the DHT.
|
||||
|
@ -404,7 +404,7 @@ class Node(MockKademliaHelper):
|
|||
raise ValueError("invalid key length!")
|
||||
|
||||
# Execute the search
|
||||
find_result = yield self._iterativeFind(key, rpc='findValue')
|
||||
find_result = yield self._iterativeFind(key, rpc='findValue', exclude=exclude)
|
||||
if isinstance(find_result, dict):
|
||||
# We have found the value; now see who was the closest contact without it...
|
||||
# ...and store the key/value pair
|
||||
|
@ -615,7 +615,7 @@ class Node(MockKademliaHelper):
|
|||
# from lbrynet.core.utils import profile_deferred
|
||||
# @profile_deferred()
|
||||
@defer.inlineCallbacks
|
||||
def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'):
|
||||
def _iterativeFind(self, key, startupShortlist=None, rpc='findNode', exclude=None):
|
||||
""" The basic Kademlia iterative lookup operation (for nodes/values)
|
||||
|
||||
This builds a list of k "closest" contacts through iterative use of
|
||||
|
@ -663,7 +663,7 @@ class Node(MockKademliaHelper):
|
|||
# This is used during the bootstrap process
|
||||
shortlist = startupShortlist
|
||||
|
||||
result = yield iterativeFind(self, shortlist, key, rpc)
|
||||
result = yield iterativeFind(self, shortlist, key, rpc, exclude=exclude)
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -28,7 +28,7 @@ class DHTPeerFinder(DummyPeerFinder):
|
|||
"""
|
||||
self.dht_node = dht_node
|
||||
self.peer_manager = peer_manager
|
||||
self.peers = []
|
||||
self.peers = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=True):
|
||||
|
@ -42,8 +42,10 @@ class DHTPeerFinder(DummyPeerFinder):
|
|||
Returns:
|
||||
list of peers for the blob
|
||||
"""
|
||||
if blob_hash not in self.peers:
|
||||
self.peers[blob_hash] = [(self.dht_node.externalIP, self.dht_node.peerPort)]
|
||||
bin_hash = binascii.unhexlify(blob_hash)
|
||||
finished_deferred = self.dht_node.iterativeFindValue(bin_hash)
|
||||
finished_deferred = self.dht_node.iterativeFindValue(bin_hash, exclude=self.peers[blob_hash])
|
||||
timeout = timeout or conf.settings['peer_search_timeout']
|
||||
if timeout:
|
||||
finished_deferred.addTimeout(timeout, self.dht_node.clock)
|
||||
|
@ -60,4 +62,5 @@ class DHTPeerFinder(DummyPeerFinder):
|
|||
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
||||
continue
|
||||
results.append(self.peer_manager.get_peer(host, port))
|
||||
self.peers[blob_hash].append((host, port))
|
||||
defer.returnValue(results)
|
||||
|
|
Loading…
Reference in a new issue