diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 020e1447d..a411f0f7a 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -197,6 +197,7 @@ ADJUSTABLE_SETTINGS = { 'run_on_startup': (bool, False), 'run_reflector_server': (bool, False), 'sd_download_timeout': (int, 3), + 'peer_search_timeout': (int, 3), 'search_servers': (list, ['lighthouse1.lbry.io:50005']), 'search_timeout': (float, 5.0), 'startup_scripts': (list, []), diff --git a/lbrynet/core/BlobAvailability.py b/lbrynet/core/BlobAvailability.py index a15b625db..8175701fe 100644 --- a/lbrynet/core/BlobAvailability.py +++ b/lbrynet/core/BlobAvailability.py @@ -38,17 +38,17 @@ class BlobAvailabilityTracker(object): if self._check_mine.running: self._check_mine.stop() - def get_blob_availability(self, blob): + def get_blob_availability(self, blob, timeout=None): def _get_peer_count(peers): have_blob = sum(1 for peer in peers if peer.is_available()) return {blob: have_blob} - d = self._peer_finder.find_peers_for_blob(blob) + d = self._peer_finder.find_peers_for_blob(blob, timeout) d.addCallback(_get_peer_count) return d - def get_availability_for_blobs(self, blobs): - dl = [self.get_blob_availability(blob) for blob in blobs if blob] + def get_availability_for_blobs(self, blobs, timeout=None): + dl = [self.get_blob_availability(blob, timeout) for blob in blobs if blob] d = defer.DeferredList(dl) d.addCallback(lambda results: [val for success, val in results if success]) return d @@ -57,7 +57,6 @@ class BlobAvailabilityTracker(object): def last_mean_availability(self): return max(Decimal(0.01), self._last_mean_availability) - def _update_peers_for_blob(self, blob): def _save_peer_info(blob_hash, peers): v = {blob_hash: peers} diff --git a/lbrynet/core/client/DHTPeerFinder.py b/lbrynet/core/client/DHTPeerFinder.py index d4f299d8e..7970a194a 100644 --- a/lbrynet/core/client/DHTPeerFinder.py +++ b/lbrynet/core/client/DHTPeerFinder.py @@ -2,7 +2,9 @@ import binascii import logging from zope.interface import implements +from twisted.internet import defer, reactor from lbrynet.interfaces import IPeerFinder +from lbrynet.core.utils import short_hash log = logging.getLogger(__name__) @@ -34,21 +36,29 @@ class DHTPeerFinder(object): def _manage_peers(self): pass - def find_peers_for_blob(self, blob_hash): + @defer.inlineCallbacks + def find_peers_for_blob(self, blob_hash, timeout=None): + def _trigger_timeout(): + if not finished_deferred.called: + log.warning("Peer search for %s timed out", short_hash(blob_hash)) + finished_deferred.cancel() + bin_hash = binascii.unhexlify(blob_hash) + finished_deferred = self.dht_node.getPeersForBlob(bin_hash) - def filter_peers(peer_list): - peers = set(peer_list) - good_peers = [] - for host, port in peers: - peer = self.peer_manager.get_peer(host, port) - if peer.is_available() is True: - good_peers.append(peer) - return good_peers + if timeout is not None: + reactor.callLater(timeout, _trigger_timeout) - d = self.dht_node.getPeersForBlob(bin_hash) - d.addCallback(filter_peers) - return d + peer_list = yield finished_deferred + + peers = set(peer_list) + good_peers = [] + for host, port in peers: + peer = self.peer_manager.get_peer(host, port) + if peer.is_available() is True: + good_peers.append(peer) + + defer.returnValue(good_peers) def get_most_popular_hashes(self, num_to_return): return self.dht_node.get_most_popular_hashes(num_to_return) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 82d90a278..1bf47a1a2 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -10,6 +10,7 @@ import simplejson as json import textwrap from requests import exceptions as requests_exceptions from decimal import Decimal +import random from twisted.web import server from twisted.internet import defer, threads, error, reactor, task @@ -2157,17 +2158,20 @@ class Daemon(AuthJSONRPCServer): """ return self.jsonrpc_peer_list(blob_hash) - def jsonrpc_peer_list(self, blob_hash): + def jsonrpc_peer_list(self, blob_hash, timeout=None): """ Get peers for blob hash Args: 'blob_hash': blob hash + 'timeout' (int, optional): peer search timeout Returns: List of contacts """ - d = self.session.peer_finder.find_peers_for_blob(blob_hash) + timeout = timeout or conf.settings['peer_search_timeout'] + + d = self.session.peer_finder.find_peers_for_blob(blob_hash, timeout=timeout) d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) d.addCallback(lambda r: self._render_response(r)) return d @@ -2263,12 +2267,15 @@ class Daemon(AuthJSONRPCServer): d = self._render_response(self.session.blob_tracker.last_mean_availability) return d - def jsonrpc_get_availability(self, name): + @defer.inlineCallbacks + def jsonrpc_get_availability(self, name, sd_timeout=None, peer_timeout=None): """ Get stream availability for a winning claim Arg: name (str): lbry uri + sd_timeout (int, optional): sd blob download timeout + peer_timeout (int, optional): how long to look for peers Returns: peers per blob / total blobs @@ -2284,17 +2291,37 @@ class Daemon(AuthJSONRPCServer): else: return 0.0 - d = self._resolve_name(name, force_refresh=True) - d.addCallback(get_sd_hash) - d.addCallback(self._download_sd_blob) - d.addCallbacks( - lambda descriptor: [blob.get('blob_hash') for blob in descriptor['blobs']], - lambda _: []) - d.addCallback(self.session.blob_tracker.get_availability_for_blobs) - d.addCallback(_get_mean) - d.addCallback(lambda result: self._render_response(result)) + def read_sd_blob(sd_blob): + sd_blob_file = sd_blob.open_for_reading() + decoded_sd_blob = json.loads(sd_blob_file.read()) + sd_blob.close_read_handle(sd_blob_file) - return d + metadata = yield self._resolve_name(name) + sd_hash = get_sd_hash(metadata) + sd_timeout = sd_timeout or conf.settings['sd_download_timeout'] + peer_timeout = peer_timeout or conf.settings['peer_search_timeout'] + blobs = [] + try: + blobs = yield self.get_blobs_for_sd_hash(sd_hash) + need_sd_blob = False + log.info("Already have sd blob") + except NoSuchSDHash: + need_sd_blob = True + log.info("Need sd blob") + blob_hashes = [blob.blob_hash for blob in blobs] + if need_sd_blob: + # we don't want to use self._download_descriptor here because it would create a stream + sd_blob = yield self._download_blob(sd_hash, timeout=sd_timeout) + decoded = read_sd_blob(sd_blob) + blob_hashes = [blob.get("blob_hash") for blob in decoded['blobs'] + if blob.get("blob_hash")] + sample = random.sample(blob_hashes, min(len(blob_hashes), 5)) + log.info("check peers for %i of %i blobs in stream", len(sample), len(blob_hashes)) + availabilities = yield self.session.blob_tracker.get_availability_for_blobs(sample, + peer_timeout) + mean_availability = _get_mean(availabilities) + response = yield self._render_response(mean_availability) + defer.returnValue(response) def jsonrpc_get_start_notice(self): """