forked from LBRYCommunity/lbry-sdk
make peer search asynchronous
This commit is contained in:
parent
234eaf8890
commit
24a11d3f21
1 changed files with 24 additions and 22 deletions
|
@ -27,8 +27,8 @@ class DHTPeerFinder(DummyPeerFinder):
|
|||
self.dht_node = dht_node
|
||||
self.peer_manager = peer_manager
|
||||
self.peers = {}
|
||||
self._ongoing_searchs = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=True):
|
||||
"""
|
||||
Find peers for blob in the DHT
|
||||
|
@ -40,25 +40,27 @@ class DHTPeerFinder(DummyPeerFinder):
|
|||
Returns:
|
||||
list of peers for the blob
|
||||
"""
|
||||
if blob_hash not in self.peers:
|
||||
self.peers[blob_hash] = [(self.dht_node.externalIP, self.dht_node.peerPort)]
|
||||
self.peers.setdefault(blob_hash, {(self.dht_node.externalIP, self.dht_node.peerPort,)})
|
||||
if not blob_hash in self._ongoing_searchs:
|
||||
self._ongoing_searchs[blob_hash] = self._execute_peer_search(blob_hash, timeout)
|
||||
peers = set(self._filter_self(blob_hash) if filter_self else self.peers[blob_hash])
|
||||
return defer.succeed([self.peer_manager.get_peer(*peer) for peer in peers])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _execute_peer_search(self, blob_hash, timeout):
|
||||
try:
|
||||
bin_hash = binascii.unhexlify(blob_hash)
|
||||
finished_deferred = self.dht_node.iterativeFindValue(bin_hash, exclude=self.peers[blob_hash])
|
||||
timeout = timeout or conf.settings['peer_search_timeout']
|
||||
if timeout:
|
||||
finished_deferred.addTimeout(timeout, self.dht_node.clock)
|
||||
try:
|
||||
peer_list = yield finished_deferred
|
||||
self.peers[blob_hash].update(set((host, port) for _, host, port in peer_list))
|
||||
except defer.TimeoutError:
|
||||
log.debug("DHT timed out while looking peers for blob %s after %s seconds",
|
||||
blob_hash, timeout)
|
||||
peer_list = []
|
||||
log.debug("DHT timed out while looking peers for blob %s after %s seconds", blob_hash, timeout)
|
||||
finally:
|
||||
del self._ongoing_searchs[blob_hash]
|
||||
|
||||
peers = set(peer_list)
|
||||
results = []
|
||||
for node_id, host, port in peers:
|
||||
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
||||
continue
|
||||
results.append(self.peer_manager.get_peer(host, port))
|
||||
self.peers[blob_hash].append((host, port))
|
||||
defer.returnValue(results)
|
||||
def _filter_self(self, blob_hash):
|
||||
my_host, my_port = self.dht_node.externalIP, self.dht_node.peerPort
|
||||
return set((host, port) for host, port in self.peers[blob_hash] if (host, port) != (my_host, my_port))
|
||||
|
|
Loading…
Reference in a new issue