diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 14fa45b53..c0dc7f21a 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -291,7 +291,7 @@ ADJUSTABLE_SETTINGS = { 'run_reflector_server': (bool, False), 'sd_download_timeout': (int, 3), 'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY - 'peer_search_timeout': (int, 30), + 'peer_search_timeout': (int, 60), 'use_auth_http': (bool, False), 'use_upnp': (bool, True), 'use_keyring': (bool, False), diff --git a/lbrynet/dht/iterativefind.py b/lbrynet/dht/iterativefind.py index 5b0e9c992..707ea971d 100644 --- a/lbrynet/dht/iterativefind.py +++ b/lbrynet/dht/iterativefind.py @@ -3,6 +3,7 @@ from twisted.internet import defer from distance import Distance from error import TimeoutError import constants +import struct log = logging.getLogger(__name__) @@ -14,10 +15,18 @@ def get_contact(contact_list, node_id, address, port): raise IndexError(node_id) +def expand_peer(compact_peer_info): + host = ".".join([str(ord(d)) for d in compact_peer_info[:4]]) + port, = struct.unpack('>H', compact_peer_info[4:6]) + peer_node_id = compact_peer_info[6:] + return (peer_node_id, host, port) + + class _IterativeFind(object): # TODO: use polymorphism to search for a value or node # instead of using a find_value flag - def __init__(self, node, shortlist, key, rpc): + def __init__(self, node, shortlist, key, rpc, exclude=None): + self.exclude = set(exclude or []) self.node = node self.finished_deferred = defer.Deferred() # all distance operations in this class only care about the distance @@ -93,8 +102,12 @@ class _IterativeFind(object): # we are looking for before treating it as a list of contact triples if self.is_find_value_request and self.key in result: # We have found the value - self.find_value_result[self.key] = result[self.key] - self.finished_deferred.callback(self.find_value_result) + for peer in result[self.key]: + _, host, port = expand_peer(peer) + if (host, port) not in self.exclude: + self.find_value_result.setdefault(self.key, []).append(peer) + if self.find_value_result: + self.finished_deferred.callback(self.find_value_result) else: if self.is_find_value_request: # We are looking for a value, and the remote node didn't have it @@ -188,8 +201,11 @@ class _IterativeFind(object): elif not self.finished_deferred.called and not self.active_probes or self.should_stop(): # If no probes were sent, there will not be any improvement, so we're done - self.sortByDistance(self.active_contacts) - self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))]) + if self.is_find_value_request: + self.finished_deferred.callback(self.find_value_result) + else: + self.sortByDistance(self.active_contacts) + self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))]) elif not self.finished_deferred.called: # Force the next iteration self.searchIteration() @@ -206,7 +222,7 @@ class _IterativeFind(object): self.pending_iteration_calls.append(cancel) -def iterativeFind(node, shortlist, key, rpc): - helper = _IterativeFind(node, shortlist, key, rpc) +def iterativeFind(node, shortlist, key, rpc, exclude=None): + helper = _IterativeFind(node, shortlist, key, rpc, exclude) helper.searchIteration(0) return helper.finished_deferred diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index bc9735d66..73f5a9916 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -375,7 +375,7 @@ class Node(MockKademliaHelper): return self._iterativeFind(key) @defer.inlineCallbacks - def iterativeFindValue(self, key): + def iterativeFindValue(self, key, exclude=None): """ The Kademlia search operation (deterministic) Call this to retrieve data from the DHT. @@ -400,7 +400,7 @@ class Node(MockKademliaHelper): raise ValueError("invalid key length!") # Execute the search - find_result = yield self._iterativeFind(key, rpc='findValue') + find_result = yield self._iterativeFind(key, rpc='findValue', exclude=exclude) if isinstance(find_result, dict): # We have found the value; now see who was the closest contact without it... # ...and store the key/value pair @@ -611,7 +611,7 @@ class Node(MockKademliaHelper): # from lbrynet.core.utils import profile_deferred # @profile_deferred() @defer.inlineCallbacks - def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'): + def _iterativeFind(self, key, startupShortlist=None, rpc='findNode', exclude=None): """ The basic Kademlia iterative lookup operation (for nodes/values) This builds a list of k "closest" contacts through iterative use of @@ -659,7 +659,7 @@ class Node(MockKademliaHelper): # This is used during the bootstrap process shortlist = startupShortlist - result = yield iterativeFind(self, shortlist, key, rpc) + result = yield iterativeFind(self, shortlist, key, rpc, exclude=exclude) defer.returnValue(result) @defer.inlineCallbacks diff --git a/lbrynet/dht/peerfinder.py b/lbrynet/dht/peerfinder.py index 3b51f7a4e..6c26502f4 100644 --- a/lbrynet/dht/peerfinder.py +++ b/lbrynet/dht/peerfinder.py @@ -28,7 +28,7 @@ class DHTPeerFinder(DummyPeerFinder): """ self.dht_node = dht_node self.peer_manager = peer_manager - self.peers = [] + self.peers = {} @defer.inlineCallbacks def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=True): @@ -42,8 +42,10 @@ class DHTPeerFinder(DummyPeerFinder): Returns: list of peers for the blob """ + if blob_hash not in self.peers: + self.peers[blob_hash] = [(self.dht_node.externalIP, self.dht_node.peerPort)] bin_hash = binascii.unhexlify(blob_hash) - finished_deferred = self.dht_node.iterativeFindValue(bin_hash) + finished_deferred = self.dht_node.iterativeFindValue(bin_hash, exclude=self.peers[blob_hash]) timeout = timeout or conf.settings['peer_search_timeout'] if timeout: finished_deferred.addTimeout(timeout, self.dht_node.clock) @@ -60,4 +62,5 @@ class DHTPeerFinder(DummyPeerFinder): if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort): continue results.append(self.peer_manager.get_peer(host, port)) + self.peers[blob_hash].append((host, port)) defer.returnValue(results)