add timeout to download_sd_blob and StandaloneBlobDownloader

This commit is contained in:
Kay Kurokawa 2017-06-20 18:56:08 -04:00
parent 1c1b5096be
commit bd4dd8e114
3 changed files with 32 additions and 10 deletions

View file

@ -9,6 +9,10 @@ class DuplicateStreamHashError(Exception):
class DownloadCanceledError(Exception): class DownloadCanceledError(Exception):
pass pass
class DownloadTimeoutError(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download {} within timeout'.format(download))
self.download = download
class RequestCanceledError(Exception): class RequestCanceledError(Exception):
pass pass

View file

@ -236,7 +236,7 @@ class StreamDescriptorIdentifier(object):
return d return d
def download_sd_blob(session, blob_hash, payment_rate_manager): def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None):
""" """
Downloads a single blob from the network Downloads a single blob from the network
@ -253,5 +253,6 @@ def download_sd_blob(session, blob_hash, payment_rate_manager):
session.peer_finder, session.peer_finder,
session.rate_limiter, session.rate_limiter,
payment_rate_manager, payment_rate_manager,
session.wallet) session.wallet,
timeout)
return downloader.download() return downloader.download()

View file

@ -5,11 +5,11 @@ from lbrynet.core.BlobInfo import BlobInfo
from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.ConnectionManager import ConnectionManager from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.core.client.DownloadManager import DownloadManager from lbrynet.core.client.DownloadManager import DownloadManager
from lbrynet.core.Error import InvalidBlobHashError from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError
from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.internet import defer from twisted.internet import defer
from twisted.internet import LoopingCall from twisted.internet.task import LoopingCall
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -32,14 +32,17 @@ class SingleBlobMetadataHandler(object):
class SingleProgressManager(object): class SingleProgressManager(object):
def __init__(self, finished_callback, download_manager): def __init__(self, download_manager, finished_callback, timeout_callback, timeout):
self.finished_callback = finished_callback self.finished_callback = finished_callback
self.timeout_callback = timeout_callback
self.download_manager = download_manager self.download_manager = download_manager
self.timeout = timeout
self.timeout_counter = 0
self.checker = LoopingCall(self._check_if_finished) self.checker = LoopingCall(self._check_if_finished)
def start(self): def start(self):
safe_start_looping_call(self.checker,1) safe_start_looping_call(self.checker, 1)
return defer.succeed(True) return defer.succeed(True)
def stop(self): def stop(self):
@ -52,6 +55,11 @@ class SingleProgressManager(object):
log.debug("The blob %s has been downloaded. Calling the finished callback", str(blob_downloaded)) log.debug("The blob %s has been downloaded. Calling the finished callback", str(blob_downloaded))
safe_stop_looping_call(self.checker) safe_stop_looping_call(self.checker)
self.finished_callback(blob_downloaded) self.finished_callback(blob_downloaded)
elif self.timeout is not None:
self.timeout_counter +=1
if self.timeout_counter >= self.timeout:
safe_stop_looping_call(self.checker)
self.timeout_callback()
def stream_position(self): def stream_position(self):
blobs = self.download_manager.blobs blobs = self.download_manager.blobs
@ -75,13 +83,15 @@ class DummyBlobHandler(object):
class StandaloneBlobDownloader(object): class StandaloneBlobDownloader(object):
def __init__(self, blob_hash, blob_manager, peer_finder, def __init__(self, blob_hash, blob_manager, peer_finder,
rate_limiter, payment_rate_manager, wallet): rate_limiter, payment_rate_manager, wallet,
timeout = None):
self.blob_hash = blob_hash self.blob_hash = blob_hash
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.peer_finder = peer_finder self.peer_finder = peer_finder
self.rate_limiter = rate_limiter self.rate_limiter = rate_limiter
self.payment_rate_manager = payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.wallet = wallet self.wallet = wallet
self.timeout = timeout
self.download_manager = None self.download_manager = None
self.finished_deferred = None self.finished_deferred = None
@ -99,8 +109,10 @@ class StandaloneBlobDownloader(object):
self.download_manager) self.download_manager)
self.download_manager.blob_info_finder = SingleBlobMetadataHandler(self.blob_hash, self.download_manager.blob_info_finder = SingleBlobMetadataHandler(self.blob_hash,
self.download_manager) self.download_manager)
self.download_manager.progress_manager = SingleProgressManager(self._blob_downloaded, self.download_manager.progress_manager = SingleProgressManager(self.download_manager,
self.download_manager) self._blob_downloaded,
self._download_timedout,
self.timeout)
self.download_manager.blob_handler = DummyBlobHandler() self.download_manager.blob_handler = DummyBlobHandler()
self.download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() self.download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger()
self.download_manager.connection_manager = ConnectionManager( self.download_manager.connection_manager = ConnectionManager(
@ -120,6 +132,11 @@ class StandaloneBlobDownloader(object):
if not self.finished_deferred.called: if not self.finished_deferred.called:
self.finished_deferred.callback(blob) self.finished_deferred.callback(blob)
def _download_timedout(self):
self.stop()
if not self.finished_deferred.called:
self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash))
def insufficient_funds(self, err): def insufficient_funds(self, err):
self.stop() self.stop()
if not self.finished_deferred.called: if not self.finished_deferred.called: