add timeout to get_availability and peer_list

-add optional timeout to DHTPeerFinder.find_peers_for_blob
-add peer_search_timeout setting
This commit is contained in:
Jack Robison 2017-02-15 23:38:33 -05:00
parent bcd026a1b6
commit 25ec8fde23
4 changed files with 67 additions and 30 deletions

View file

@ -197,6 +197,7 @@ ADJUSTABLE_SETTINGS = {
'run_on_startup': (bool, False), 'run_on_startup': (bool, False),
'run_reflector_server': (bool, False), 'run_reflector_server': (bool, False),
'sd_download_timeout': (int, 3), 'sd_download_timeout': (int, 3),
'peer_search_timeout': (int, 3),
'search_servers': (list, ['lighthouse1.lbry.io:50005']), 'search_servers': (list, ['lighthouse1.lbry.io:50005']),
'search_timeout': (float, 5.0), 'search_timeout': (float, 5.0),
'startup_scripts': (list, []), 'startup_scripts': (list, []),

View file

@ -38,17 +38,17 @@ class BlobAvailabilityTracker(object):
if self._check_mine.running: if self._check_mine.running:
self._check_mine.stop() self._check_mine.stop()
def get_blob_availability(self, blob): def get_blob_availability(self, blob, timeout=None):
def _get_peer_count(peers): def _get_peer_count(peers):
have_blob = sum(1 for peer in peers if peer.is_available()) have_blob = sum(1 for peer in peers if peer.is_available())
return {blob: have_blob} return {blob: have_blob}
d = self._peer_finder.find_peers_for_blob(blob) d = self._peer_finder.find_peers_for_blob(blob, timeout)
d.addCallback(_get_peer_count) d.addCallback(_get_peer_count)
return d return d
def get_availability_for_blobs(self, blobs): def get_availability_for_blobs(self, blobs, timeout=None):
dl = [self.get_blob_availability(blob) for blob in blobs if blob] dl = [self.get_blob_availability(blob, timeout) for blob in blobs if blob]
d = defer.DeferredList(dl) d = defer.DeferredList(dl)
d.addCallback(lambda results: [val for success, val in results if success]) d.addCallback(lambda results: [val for success, val in results if success])
return d return d
@ -57,7 +57,6 @@ class BlobAvailabilityTracker(object):
def last_mean_availability(self): def last_mean_availability(self):
return max(Decimal(0.01), self._last_mean_availability) return max(Decimal(0.01), self._last_mean_availability)
def _update_peers_for_blob(self, blob): def _update_peers_for_blob(self, blob):
def _save_peer_info(blob_hash, peers): def _save_peer_info(blob_hash, peers):
v = {blob_hash: peers} v = {blob_hash: peers}

View file

@ -2,7 +2,9 @@ import binascii
import logging import logging
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer, reactor
from lbrynet.interfaces import IPeerFinder from lbrynet.interfaces import IPeerFinder
from lbrynet.core.utils import short_hash
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -34,21 +36,29 @@ class DHTPeerFinder(object):
def _manage_peers(self): def _manage_peers(self):
pass pass
def find_peers_for_blob(self, blob_hash): @defer.inlineCallbacks
def find_peers_for_blob(self, blob_hash, timeout=None):
def _trigger_timeout():
if not finished_deferred.called:
log.warning("Peer search for %s timed out", short_hash(blob_hash))
finished_deferred.cancel()
bin_hash = binascii.unhexlify(blob_hash) bin_hash = binascii.unhexlify(blob_hash)
finished_deferred = self.dht_node.getPeersForBlob(bin_hash)
def filter_peers(peer_list): if timeout is not None:
peers = set(peer_list) reactor.callLater(timeout, _trigger_timeout)
good_peers = []
for host, port in peers:
peer = self.peer_manager.get_peer(host, port)
if peer.is_available() is True:
good_peers.append(peer)
return good_peers
d = self.dht_node.getPeersForBlob(bin_hash) peer_list = yield finished_deferred
d.addCallback(filter_peers)
return d peers = set(peer_list)
good_peers = []
for host, port in peers:
peer = self.peer_manager.get_peer(host, port)
if peer.is_available() is True:
good_peers.append(peer)
defer.returnValue(good_peers)
def get_most_popular_hashes(self, num_to_return): def get_most_popular_hashes(self, num_to_return):
return self.dht_node.get_most_popular_hashes(num_to_return) return self.dht_node.get_most_popular_hashes(num_to_return)

View file

@ -10,6 +10,7 @@ import simplejson as json
import textwrap import textwrap
from requests import exceptions as requests_exceptions from requests import exceptions as requests_exceptions
from decimal import Decimal from decimal import Decimal
import random
from twisted.web import server from twisted.web import server
from twisted.internet import defer, threads, error, reactor, task from twisted.internet import defer, threads, error, reactor, task
@ -2157,17 +2158,20 @@ class Daemon(AuthJSONRPCServer):
""" """
return self.jsonrpc_peer_list(blob_hash) return self.jsonrpc_peer_list(blob_hash)
def jsonrpc_peer_list(self, blob_hash): def jsonrpc_peer_list(self, blob_hash, timeout=None):
""" """
Get peers for blob hash Get peers for blob hash
Args: Args:
'blob_hash': blob hash 'blob_hash': blob hash
'timeout' (int, optional): peer search timeout
Returns: Returns:
List of contacts List of contacts
""" """
d = self.session.peer_finder.find_peers_for_blob(blob_hash) timeout = timeout or conf.settings['peer_search_timeout']
d = self.session.peer_finder.find_peers_for_blob(blob_hash, timeout=timeout)
d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r])
d.addCallback(lambda r: self._render_response(r)) d.addCallback(lambda r: self._render_response(r))
return d return d
@ -2263,12 +2267,15 @@ class Daemon(AuthJSONRPCServer):
d = self._render_response(self.session.blob_tracker.last_mean_availability) d = self._render_response(self.session.blob_tracker.last_mean_availability)
return d return d
def jsonrpc_get_availability(self, name): @defer.inlineCallbacks
def jsonrpc_get_availability(self, name, sd_timeout=None, peer_timeout=None):
""" """
Get stream availability for a winning claim Get stream availability for a winning claim
Arg: Arg:
name (str): lbry uri name (str): lbry uri
sd_timeout (int, optional): sd blob download timeout
peer_timeout (int, optional): how long to look for peers
Returns: Returns:
peers per blob / total blobs peers per blob / total blobs
@ -2284,17 +2291,37 @@ class Daemon(AuthJSONRPCServer):
else: else:
return 0.0 return 0.0
d = self._resolve_name(name, force_refresh=True) def read_sd_blob(sd_blob):
d.addCallback(get_sd_hash) sd_blob_file = sd_blob.open_for_reading()
d.addCallback(self._download_sd_blob) decoded_sd_blob = json.loads(sd_blob_file.read())
d.addCallbacks( sd_blob.close_read_handle(sd_blob_file)
lambda descriptor: [blob.get('blob_hash') for blob in descriptor['blobs']],
lambda _: [])
d.addCallback(self.session.blob_tracker.get_availability_for_blobs)
d.addCallback(_get_mean)
d.addCallback(lambda result: self._render_response(result))
return d metadata = yield self._resolve_name(name)
sd_hash = get_sd_hash(metadata)
sd_timeout = sd_timeout or conf.settings['sd_download_timeout']
peer_timeout = peer_timeout or conf.settings['peer_search_timeout']
blobs = []
try:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
need_sd_blob = False
log.info("Already have sd blob")
except NoSuchSDHash:
need_sd_blob = True
log.info("Need sd blob")
blob_hashes = [blob.blob_hash for blob in blobs]
if need_sd_blob:
# we don't want to use self._download_descriptor here because it would create a stream
sd_blob = yield self._download_blob(sd_hash, timeout=sd_timeout)
decoded = read_sd_blob(sd_blob)
blob_hashes = [blob.get("blob_hash") for blob in decoded['blobs']
if blob.get("blob_hash")]
sample = random.sample(blob_hashes, min(len(blob_hashes), 5))
log.info("check peers for %i of %i blobs in stream", len(sample), len(blob_hashes))
availabilities = yield self.session.blob_tracker.get_availability_for_blobs(sample,
peer_timeout)
mean_availability = _get_mean(availabilities)
response = yield self._render_response(mean_availability)
defer.returnValue(response)
def jsonrpc_get_start_notice(self): def jsonrpc_get_start_notice(self):
""" """