diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 139cd5bdf..67dde9dd4 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -9,6 +9,10 @@ class DuplicateStreamHashError(Exception): class DownloadCanceledError(Exception): pass +class DownloadTimeoutError(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download {} within timeout'.format(download)) + self.download = download class RequestCanceledError(Exception): pass diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 027393c4f..8c12afe39 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -236,7 +236,7 @@ class StreamDescriptorIdentifier(object): return d -def download_sd_blob(session, blob_hash, payment_rate_manager): +def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): """ Downloads a single blob from the network @@ -253,5 +253,6 @@ def download_sd_blob(session, blob_hash, payment_rate_manager): session.peer_finder, session.rate_limiter, payment_rate_manager, - session.wallet) + session.wallet, + timeout) return downloader.download() diff --git a/lbrynet/core/client/StandaloneBlobDownloader.py b/lbrynet/core/client/StandaloneBlobDownloader.py index 1e247b0a2..9f637a730 100644 --- a/lbrynet/core/client/StandaloneBlobDownloader.py +++ b/lbrynet/core/client/StandaloneBlobDownloader.py @@ -5,11 +5,11 @@ from lbrynet.core.BlobInfo import BlobInfo from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.ConnectionManager import ConnectionManager from lbrynet.core.client.DownloadManager import DownloadManager -from lbrynet.core.Error import InvalidBlobHashError +from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call from twisted.python.failure import Failure from twisted.internet import defer -from twisted.internet import LoopingCall +from twisted.internet.task import LoopingCall log = logging.getLogger(__name__) @@ -32,14 +32,17 @@ class SingleBlobMetadataHandler(object): class SingleProgressManager(object): - def __init__(self, finished_callback, download_manager): + def __init__(self, download_manager, finished_callback, timeout_callback, timeout): self.finished_callback = finished_callback + self.timeout_callback = timeout_callback self.download_manager = download_manager - self.checker = LoopingCall(self._check_if_finished) + self.timeout = timeout + self.timeout_counter = 0 + self.checker = LoopingCall(self._check_if_finished) def start(self): - safe_start_looping_call(self.checker,1) + safe_start_looping_call(self.checker, 1) return defer.succeed(True) def stop(self): @@ -52,6 +55,11 @@ class SingleProgressManager(object): log.debug("The blob %s has been downloaded. Calling the finished callback", str(blob_downloaded)) safe_stop_looping_call(self.checker) self.finished_callback(blob_downloaded) + elif self.timeout is not None: + self.timeout_counter +=1 + if self.timeout_counter >= self.timeout: + safe_stop_looping_call(self.checker) + self.timeout_callback() def stream_position(self): blobs = self.download_manager.blobs @@ -75,13 +83,15 @@ class DummyBlobHandler(object): class StandaloneBlobDownloader(object): def __init__(self, blob_hash, blob_manager, peer_finder, - rate_limiter, payment_rate_manager, wallet): + rate_limiter, payment_rate_manager, wallet, + timeout = None): self.blob_hash = blob_hash self.blob_manager = blob_manager self.peer_finder = peer_finder self.rate_limiter = rate_limiter self.payment_rate_manager = payment_rate_manager self.wallet = wallet + self.timeout = timeout self.download_manager = None self.finished_deferred = None @@ -99,8 +109,10 @@ class StandaloneBlobDownloader(object): self.download_manager) self.download_manager.blob_info_finder = SingleBlobMetadataHandler(self.blob_hash, self.download_manager) - self.download_manager.progress_manager = SingleProgressManager(self._blob_downloaded, - self.download_manager) + self.download_manager.progress_manager = SingleProgressManager(self.download_manager, + self._blob_downloaded, + self._download_timedout, + self.timeout) self.download_manager.blob_handler = DummyBlobHandler() self.download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() self.download_manager.connection_manager = ConnectionManager( @@ -120,6 +132,11 @@ class StandaloneBlobDownloader(object): if not self.finished_deferred.called: self.finished_deferred.callback(blob) + def _download_timedout(self): + self.stop() + if not self.finished_deferred.called: + self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash)) + def insufficient_funds(self, err): self.stop() if not self.finished_deferred.called: