forked from LBRYCommunity/lbry-sdk
greedy search with exclude filtering
This commit is contained in:
parent
284264962d
commit
c589e89bf0
4 changed files with 33 additions and 14 deletions
|
@ -291,7 +291,7 @@ ADJUSTABLE_SETTINGS = {
|
||||||
'run_reflector_server': (bool, False),
|
'run_reflector_server': (bool, False),
|
||||||
'sd_download_timeout': (int, 3),
|
'sd_download_timeout': (int, 3),
|
||||||
'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY
|
'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY
|
||||||
'peer_search_timeout': (int, 30),
|
'peer_search_timeout': (int, 60),
|
||||||
'use_auth_http': (bool, False),
|
'use_auth_http': (bool, False),
|
||||||
'use_upnp': (bool, True),
|
'use_upnp': (bool, True),
|
||||||
'use_keyring': (bool, False),
|
'use_keyring': (bool, False),
|
||||||
|
|
|
@ -3,6 +3,7 @@ from twisted.internet import defer
|
||||||
from distance import Distance
|
from distance import Distance
|
||||||
from error import TimeoutError
|
from error import TimeoutError
|
||||||
import constants
|
import constants
|
||||||
|
import struct
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -14,10 +15,18 @@ def get_contact(contact_list, node_id, address, port):
|
||||||
raise IndexError(node_id)
|
raise IndexError(node_id)
|
||||||
|
|
||||||
|
|
||||||
|
def expand_peer(compact_peer_info):
|
||||||
|
host = ".".join([str(ord(d)) for d in compact_peer_info[:4]])
|
||||||
|
port, = struct.unpack('>H', compact_peer_info[4:6])
|
||||||
|
peer_node_id = compact_peer_info[6:]
|
||||||
|
return (peer_node_id, host, port)
|
||||||
|
|
||||||
|
|
||||||
class _IterativeFind(object):
|
class _IterativeFind(object):
|
||||||
# TODO: use polymorphism to search for a value or node
|
# TODO: use polymorphism to search for a value or node
|
||||||
# instead of using a find_value flag
|
# instead of using a find_value flag
|
||||||
def __init__(self, node, shortlist, key, rpc):
|
def __init__(self, node, shortlist, key, rpc, exclude=None):
|
||||||
|
self.exclude = set(exclude or [])
|
||||||
self.node = node
|
self.node = node
|
||||||
self.finished_deferred = defer.Deferred()
|
self.finished_deferred = defer.Deferred()
|
||||||
# all distance operations in this class only care about the distance
|
# all distance operations in this class only care about the distance
|
||||||
|
@ -93,8 +102,12 @@ class _IterativeFind(object):
|
||||||
# we are looking for before treating it as a list of contact triples
|
# we are looking for before treating it as a list of contact triples
|
||||||
if self.is_find_value_request and self.key in result:
|
if self.is_find_value_request and self.key in result:
|
||||||
# We have found the value
|
# We have found the value
|
||||||
self.find_value_result[self.key] = result[self.key]
|
for peer in result[self.key]:
|
||||||
self.finished_deferred.callback(self.find_value_result)
|
_, host, port = expand_peer(peer)
|
||||||
|
if (host, port) not in self.exclude:
|
||||||
|
self.find_value_result.setdefault(self.key, []).append(peer)
|
||||||
|
if self.find_value_result:
|
||||||
|
self.finished_deferred.callback(self.find_value_result)
|
||||||
else:
|
else:
|
||||||
if self.is_find_value_request:
|
if self.is_find_value_request:
|
||||||
# We are looking for a value, and the remote node didn't have it
|
# We are looking for a value, and the remote node didn't have it
|
||||||
|
@ -188,8 +201,11 @@ class _IterativeFind(object):
|
||||||
|
|
||||||
elif not self.finished_deferred.called and not self.active_probes or self.should_stop():
|
elif not self.finished_deferred.called and not self.active_probes or self.should_stop():
|
||||||
# If no probes were sent, there will not be any improvement, so we're done
|
# If no probes were sent, there will not be any improvement, so we're done
|
||||||
self.sortByDistance(self.active_contacts)
|
if self.is_find_value_request:
|
||||||
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
self.finished_deferred.callback(self.find_value_result)
|
||||||
|
else:
|
||||||
|
self.sortByDistance(self.active_contacts)
|
||||||
|
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
||||||
elif not self.finished_deferred.called:
|
elif not self.finished_deferred.called:
|
||||||
# Force the next iteration
|
# Force the next iteration
|
||||||
self.searchIteration()
|
self.searchIteration()
|
||||||
|
@ -206,7 +222,7 @@ class _IterativeFind(object):
|
||||||
self.pending_iteration_calls.append(cancel)
|
self.pending_iteration_calls.append(cancel)
|
||||||
|
|
||||||
|
|
||||||
def iterativeFind(node, shortlist, key, rpc):
|
def iterativeFind(node, shortlist, key, rpc, exclude=None):
|
||||||
helper = _IterativeFind(node, shortlist, key, rpc)
|
helper = _IterativeFind(node, shortlist, key, rpc, exclude)
|
||||||
helper.searchIteration(0)
|
helper.searchIteration(0)
|
||||||
return helper.finished_deferred
|
return helper.finished_deferred
|
||||||
|
|
|
@ -375,7 +375,7 @@ class Node(MockKademliaHelper):
|
||||||
return self._iterativeFind(key)
|
return self._iterativeFind(key)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def iterativeFindValue(self, key):
|
def iterativeFindValue(self, key, exclude=None):
|
||||||
""" The Kademlia search operation (deterministic)
|
""" The Kademlia search operation (deterministic)
|
||||||
|
|
||||||
Call this to retrieve data from the DHT.
|
Call this to retrieve data from the DHT.
|
||||||
|
@ -400,7 +400,7 @@ class Node(MockKademliaHelper):
|
||||||
raise ValueError("invalid key length!")
|
raise ValueError("invalid key length!")
|
||||||
|
|
||||||
# Execute the search
|
# Execute the search
|
||||||
find_result = yield self._iterativeFind(key, rpc='findValue')
|
find_result = yield self._iterativeFind(key, rpc='findValue', exclude=exclude)
|
||||||
if isinstance(find_result, dict):
|
if isinstance(find_result, dict):
|
||||||
# We have found the value; now see who was the closest contact without it...
|
# We have found the value; now see who was the closest contact without it...
|
||||||
# ...and store the key/value pair
|
# ...and store the key/value pair
|
||||||
|
@ -611,7 +611,7 @@ class Node(MockKademliaHelper):
|
||||||
# from lbrynet.core.utils import profile_deferred
|
# from lbrynet.core.utils import profile_deferred
|
||||||
# @profile_deferred()
|
# @profile_deferred()
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'):
|
def _iterativeFind(self, key, startupShortlist=None, rpc='findNode', exclude=None):
|
||||||
""" The basic Kademlia iterative lookup operation (for nodes/values)
|
""" The basic Kademlia iterative lookup operation (for nodes/values)
|
||||||
|
|
||||||
This builds a list of k "closest" contacts through iterative use of
|
This builds a list of k "closest" contacts through iterative use of
|
||||||
|
@ -659,7 +659,7 @@ class Node(MockKademliaHelper):
|
||||||
# This is used during the bootstrap process
|
# This is used during the bootstrap process
|
||||||
shortlist = startupShortlist
|
shortlist = startupShortlist
|
||||||
|
|
||||||
result = yield iterativeFind(self, shortlist, key, rpc)
|
result = yield iterativeFind(self, shortlist, key, rpc, exclude=exclude)
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
|
@ -28,7 +28,7 @@ class DHTPeerFinder(DummyPeerFinder):
|
||||||
"""
|
"""
|
||||||
self.dht_node = dht_node
|
self.dht_node = dht_node
|
||||||
self.peer_manager = peer_manager
|
self.peer_manager = peer_manager
|
||||||
self.peers = []
|
self.peers = {}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=True):
|
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=True):
|
||||||
|
@ -42,8 +42,10 @@ class DHTPeerFinder(DummyPeerFinder):
|
||||||
Returns:
|
Returns:
|
||||||
list of peers for the blob
|
list of peers for the blob
|
||||||
"""
|
"""
|
||||||
|
if blob_hash not in self.peers:
|
||||||
|
self.peers[blob_hash] = [(self.dht_node.externalIP, self.dht_node.peerPort)]
|
||||||
bin_hash = binascii.unhexlify(blob_hash)
|
bin_hash = binascii.unhexlify(blob_hash)
|
||||||
finished_deferred = self.dht_node.iterativeFindValue(bin_hash)
|
finished_deferred = self.dht_node.iterativeFindValue(bin_hash, exclude=self.peers[blob_hash])
|
||||||
timeout = timeout or conf.settings['peer_search_timeout']
|
timeout = timeout or conf.settings['peer_search_timeout']
|
||||||
if timeout:
|
if timeout:
|
||||||
finished_deferred.addTimeout(timeout, self.dht_node.clock)
|
finished_deferred.addTimeout(timeout, self.dht_node.clock)
|
||||||
|
@ -60,4 +62,5 @@ class DHTPeerFinder(DummyPeerFinder):
|
||||||
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
||||||
continue
|
continue
|
||||||
results.append(self.peer_manager.get_peer(host, port))
|
results.append(self.peer_manager.get_peer(host, port))
|
||||||
|
self.peers[blob_hash].append((host, port))
|
||||||
defer.returnValue(results)
|
defer.returnValue(results)
|
||||||
|
|
Loading…
Add table
Reference in a new issue