diff --git a/CHANGELOG.md b/CHANGELOG.md index fc4b5563c..d49ca865b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ at anytime. * ### Fixed - * + * Fixed timeout behaviour when calling API command get * ### Deprecated diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 139cd5bdf..67dde9dd4 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -9,6 +9,10 @@ class DuplicateStreamHashError(Exception): class DownloadCanceledError(Exception): pass +class DownloadTimeoutError(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download {} within timeout'.format(download)) + self.download = download class RequestCanceledError(Exception): pass diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 027393c4f..8c12afe39 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -236,7 +236,7 @@ class StreamDescriptorIdentifier(object): return d -def download_sd_blob(session, blob_hash, payment_rate_manager): +def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): """ Downloads a single blob from the network @@ -253,5 +253,6 @@ def download_sd_blob(session, blob_hash, payment_rate_manager): session.peer_finder, session.rate_limiter, payment_rate_manager, - session.wallet) + session.wallet, + timeout) return downloader.download() diff --git a/lbrynet/core/client/DownloadManager.py b/lbrynet/core/client/DownloadManager.py index a802d7841..cd74588ec 100644 --- a/lbrynet/core/client/DownloadManager.py +++ b/lbrynet/core/client/DownloadManager.py @@ -1,6 +1,5 @@ import logging from twisted.internet import defer -from twisted.python import failure from zope.interface import implements from lbrynet import interfaces @@ -29,37 +28,17 @@ class DownloadManager(object): d.addCallback(lambda _: self.resume_downloading()) return d + @defer.inlineCallbacks def resume_downloading(self): + yield self.connection_manager.start() + yield self.progress_manager.start() + defer.returnValue(True) - def check_start(result, manager): - if isinstance(result, failure.Failure): - log.error("Failed to start the %s: %s", manager, result.getErrorMessage()) - return False - return True - - d1 = self.progress_manager.start() - d1.addBoth(check_start, "progress manager") - d2 = self.connection_manager.start() - d2.addBoth(check_start, "connection manager") - dl = defer.DeferredList([d1, d2]) - dl.addCallback(lambda xs: False not in xs) - return dl - + @defer.inlineCallbacks def stop_downloading(self): - - def check_stop(result, manager): - if isinstance(result, failure.Failure): - log.error("Failed to stop the %s: %s", manager, result.getErrorMessage()) - return False - return True - - d1 = self.progress_manager.stop() - d1.addBoth(check_stop, "progress manager") - d2 = self.connection_manager.stop() - d2.addBoth(check_stop, "connection manager") - dl = defer.DeferredList([d1, d2], consumeErrors=True) - dl.addCallback(lambda results: all([success for success, val in results])) - return dl + yield self.progress_manager.stop() + yield self.connection_manager.stop() + defer.returnValue(True) def add_blobs_to_download(self, blob_infos): diff --git a/lbrynet/core/client/StandaloneBlobDownloader.py b/lbrynet/core/client/StandaloneBlobDownloader.py index e4f44ca6e..97dc4727f 100644 --- a/lbrynet/core/client/StandaloneBlobDownloader.py +++ b/lbrynet/core/client/StandaloneBlobDownloader.py @@ -5,11 +5,11 @@ from lbrynet.core.BlobInfo import BlobInfo from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.ConnectionManager import ConnectionManager from lbrynet.core.client.DownloadManager import DownloadManager -from lbrynet.core.Error import InvalidBlobHashError -from lbrynet.core.utils import is_valid_blobhash +from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError +from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call from twisted.python.failure import Failure from twisted.internet import defer - +from twisted.internet.task import LoopingCall log = logging.getLogger(__name__) @@ -32,36 +32,35 @@ class SingleBlobMetadataHandler(object): class SingleProgressManager(object): - def __init__(self, finished_callback, download_manager): + def __init__(self, download_manager, finished_callback, timeout_callback, timeout): self.finished_callback = finished_callback - self.finished = False + self.timeout_callback = timeout_callback self.download_manager = download_manager - self._next_check_if_finished = None + + self.timeout = timeout + self.timeout_counter = 0 + self.checker = LoopingCall(self._check_if_finished) def start(self): - - from twisted.internet import reactor - - assert self._next_check_if_finished is None - self._next_check_if_finished = reactor.callLater(0, self._check_if_finished) + safe_start_looping_call(self.checker, 1) return defer.succeed(True) def stop(self): - if self._next_check_if_finished is not None: - self._next_check_if_finished.cancel() - self._next_check_if_finished = None + safe_stop_looping_call(self.checker) return defer.succeed(True) def _check_if_finished(self): - - from twisted.internet import reactor - - self._next_check_if_finished = None - if self.finished is False: - if self.stream_position() == 1: - self.blob_downloaded(self.download_manager.blobs[0], 0) - else: - self._next_check_if_finished = reactor.callLater(1, self._check_if_finished) + if self.stream_position() == 1: + blob_downloaded = self.download_manager.blobs[0] + log.debug("The blob %s has been downloaded. Calling the finished callback", + str(blob_downloaded)) + safe_stop_looping_call(self.checker) + self.finished_callback(blob_downloaded) + elif self.timeout is not None: + self.timeout_counter += 1 + if self.timeout_counter >= self.timeout: + safe_stop_looping_call(self.checker) + self.timeout_callback() def stream_position(self): blobs = self.download_manager.blobs @@ -74,15 +73,6 @@ class SingleProgressManager(object): assert len(blobs) == 1 return [b for b in blobs.itervalues() if not b.is_validated()] - def blob_downloaded(self, blob, blob_num): - - from twisted.internet import reactor - - log.debug("The blob %s has been downloaded. Calling the finished callback", str(blob)) - if self.finished is False: - self.finished = True - reactor.callLater(0, self.finished_callback, blob) - class DummyBlobHandler(object): def __init__(self): @@ -94,13 +84,15 @@ class DummyBlobHandler(object): class StandaloneBlobDownloader(object): def __init__(self, blob_hash, blob_manager, peer_finder, - rate_limiter, payment_rate_manager, wallet): + rate_limiter, payment_rate_manager, wallet, + timeout=None): self.blob_hash = blob_hash self.blob_manager = blob_manager self.peer_finder = peer_finder self.rate_limiter = rate_limiter self.payment_rate_manager = payment_rate_manager self.wallet = wallet + self.timeout = timeout self.download_manager = None self.finished_deferred = None @@ -118,8 +110,10 @@ class StandaloneBlobDownloader(object): self.download_manager) self.download_manager.blob_info_finder = SingleBlobMetadataHandler(self.blob_hash, self.download_manager) - self.download_manager.progress_manager = SingleProgressManager(self._blob_downloaded, - self.download_manager) + self.download_manager.progress_manager = SingleProgressManager(self.download_manager, + self._blob_downloaded, + self._download_timedout, + self.timeout) self.download_manager.blob_handler = DummyBlobHandler() self.download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() self.download_manager.connection_manager = ConnectionManager( @@ -139,6 +133,11 @@ class StandaloneBlobDownloader(object): if not self.finished_deferred.called: self.finished_deferred.callback(blob) + def _download_timedout(self): + self.stop() + if not self.finished_deferred.called: + self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash)) + def insufficient_funds(self, err): self.stop() if not self.finished_deferred.called: diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index eeb3f6be7..9dd629d92 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -49,6 +49,13 @@ def call_later(delay, func, *args, **kwargs): from twisted.internet import reactor return reactor.callLater(delay, func, *args, **kwargs) +def safe_start_looping_call(looping_call, interval_sec): + if not looping_call.running: + looping_call.start(interval_sec) + +def safe_stop_looping_call(looping_call): + if looping_call.running: + looping_call.stop() def generate_id(num=None): h = get_lbry_hash_obj() diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index a4f8115d2..7c3ab8216 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -599,26 +599,12 @@ class Daemon(AuthJSONRPCServer): :param timeout (int): blob timeout :return: BlobFile """ - - def cb(blob): - if not finished_d.called: - finished_d.callback(blob) - - def eb(): - if not finished_d.called: - finished_d.errback(Exception("Blob (%s) download timed out" % - blob_hash[:SHORT_ID_LEN])) - if not blob_hash: raise Exception("Nothing to download") rate_manager = rate_manager or self.session.payment_rate_manager timeout = timeout or 30 - finished_d = defer.Deferred(None) - reactor.callLater(timeout, eb) - d = download_sd_blob(self.session, blob_hash, rate_manager) - d.addCallback(cb) - return finished_d + return download_sd_blob(self.session, blob_hash, rate_manager, timeout) @defer.inlineCallbacks def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index cc983d2fa..03a201d67 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -5,7 +5,8 @@ from twisted.internet.task import LoopingCall from lbryschema.fee import Fee -from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed +from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError +from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader @@ -28,16 +29,6 @@ STREAM_STAGES = [ log = logging.getLogger(__name__) -def safe_start(looping_call): - if not looping_call.running: - looping_call.start(1) - - -def safe_stop(looping_call): - if looping_call.running: - looping_call.stop() - - class GetStream(object): def __init__(self, sd_identifier, session, exchange_rate_manager, max_key_fee, data_rate=None, timeout=None, @@ -72,11 +63,10 @@ class GetStream(object): def _check_status(self, status): stop_condition = (status.num_completed > 0 or status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED) - if stop_condition and not self.data_downloading_deferred.called: self.data_downloading_deferred.callback(True) if self.data_downloading_deferred.called: - safe_stop(self.checker) + safe_stop_looping_call(self.checker) else: log.info("Downloading stream data (%i seconds)", self.timeout_counter) @@ -84,17 +74,14 @@ class GetStream(object): """ Check if we've got the first data blob in the stream yet """ - self.timeout_counter += 1 if self.timeout_counter >= self.timeout: if not self.data_downloading_deferred.called: - self.data_downloading_deferred.errback(Exception("Timeout")) - safe_stop(self.checker) - elif self.downloader: + self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name)) + safe_stop_looping_call(self.checker) + else: d = self.downloader.status() d.addCallback(self._check_status) - else: - log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) def convert_max_fee(self): currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] @@ -158,15 +145,14 @@ class GetStream(object): self.set_status(DOWNLOAD_STOPPED_CODE, name) log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) - safe_stop(self.checker) + safe_stop_looping_call(self.checker) status = yield self.downloader.status() self._check_status(status) defer.returnValue(self.download_path) @defer.inlineCallbacks - def initialize(self, stream_info, name): + def _initialize(self, stream_info): # Set sd_hash and return key_fee from stream_info - self.set_status(INITIALIZING_CODE, name) self.sd_hash = stream_info.source_hash key_fee = None if stream_info.has_fee: @@ -181,15 +167,15 @@ class GetStream(object): defer.returnValue(downloader) @defer.inlineCallbacks - def download(self, name, key_fee): - # download sd blob, and start downloader - self.set_status(DOWNLOAD_METADATA_CODE, name) - sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager) - self.downloader = yield self._create_downloader(sd_blob) + def _download_sd_blob(self): + sd_blob = yield download_sd_blob(self.session, self.sd_hash, + self.payment_rate_manager, self.timeout) + defer.returnValue(sd_blob) - self.set_status(DOWNLOAD_RUNNING_CODE, name) - if key_fee: - yield self.pay_key_fee(key_fee, name) + @defer.inlineCallbacks + def _download(self, sd_blob, name, key_fee): + self.downloader = yield self._create_downloader(sd_blob) + yield self.pay_key_fee(key_fee, name) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() @@ -206,21 +192,21 @@ class GetStream(object): downloader - instance of ManagedEncryptedFileDownloader finished_deferred - deferred callbacked when download is finished """ - key_fee = yield self.initialize(stream_info, name) - safe_start(self.checker) + self.set_status(INITIALIZING_CODE, name) + key_fee = yield self._initialize(stream_info) - try: - yield self.download(name, key_fee) - except Exception as err: - safe_stop(self.checker) - raise + self.set_status(DOWNLOAD_METADATA_CODE, name) + sd_blob = yield self._download_sd_blob() + yield self._download(sd_blob, name, key_fee) + self.set_status(DOWNLOAD_RUNNING_CODE, name) + safe_start_looping_call(self.checker, 1) try: yield self.data_downloading_deferred except Exception as err: self.downloader.stop() - safe_stop(self.checker) + safe_stop_looping_call(self.checker) raise defer.returnValue((self.downloader, self.finished_deferred)) diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py index 7e47ac233..0795dc047 100644 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -2,17 +2,18 @@ import types import mock import json from twisted.trial import unittest -from twisted.internet import defer +from twisted.internet import defer, task from lbryschema.claim import ClaimDict from lbrynet.core import Session, PaymentRateManager, Wallet +from lbrynet.core.Error import DownloadTimeoutError from lbrynet.daemon import Downloader from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier -from lbrynet.core.HashBlob import TempBlob -from lbrynet.core.BlobManager import TempBlobManager -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory + +from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader, ManagedEncryptedFileDownloaderFactory from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker @@ -20,10 +21,48 @@ from tests.mocks import ExchangeRateManager as DummyExchangeRateManager from tests.mocks import BTCLBCFeed, USDBTCFeed from tests.mocks import mock_conf_settings +class MocDownloader(object): + def __init__(self): + self.finish_deferred = defer.Deferred(None) + self.stop_called = False + + self.name = 'test' + self.num_completed = 0 + self.num_known = 1 + self.running_status = ManagedEncryptedFileDownloader.STATUS_RUNNING + + @defer.inlineCallbacks + def status(self): + out = yield EncryptedFileStatusReport(self.name, self.num_completed, self.num_known, self.running_status) + defer.returnValue(out) + + def start(self): + return self.finish_deferred + + def stop(self): + self.stop_called = True + self.finish_deferred.callback(True) + +def moc_initialize(self,stream_info): + self.sd_hash ="d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b" + return None + +def moc_download_sd_blob(self): + return None + +def moc_download(self, sd_blob, name, key_fee): + self.pay_key_fee(key_fee, name) + self.downloader = MocDownloader() + self.downloader.start() + +def moc_pay_key_fee(self, key_fee, name): + self.pay_key_fee_called = True + class GetStreamTests(unittest.TestCase): def init_getstream_with_mocs(self): mock_conf_settings(self) + sd_identifier = mock.Mock(spec=StreamDescriptorIdentifier) session = mock.Mock(spec=Session.Session) session.wallet = mock.Mock(spec=Wallet.LBRYumWallet) @@ -37,8 +76,11 @@ class GetStreamTests(unittest.TestCase): data_rate = {'currency':"LBC", 'amount':0, 'address':''} getstream = Downloader.GetStream(sd_identifier, session, - exchange_rate_manager, max_key_fee, timeout=10, data_rate=data_rate) + exchange_rate_manager, max_key_fee, timeout=3, data_rate=data_rate) + getstream.pay_key_fee_called = False + self.clock = task.Clock() + getstream.checker.clock = self.clock return getstream @defer.inlineCallbacks @@ -56,3 +98,86 @@ class GetStreamTests(unittest.TestCase): yield getstream.start(stream_info,name) + @defer.inlineCallbacks + def test_sd_blob_download_timeout(self): + """ + test that if download_sd_blob fails due to timeout, + DownloadTimeoutError is raised + """ + def download_sd_blob(self): + raise DownloadTimeoutError(self.file_name) + + getstream = self.init_getstream_with_mocs() + getstream._initialize = types.MethodType(moc_initialize, getstream) + getstream._download_sd_blob = types.MethodType(download_sd_blob, getstream) + getstream._download = types.MethodType(moc_download, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + name='test' + stream_info = None + with self.assertRaises(DownloadTimeoutError): + yield getstream.start(stream_info,name) + self.assertFalse(getstream.pay_key_fee_called) + + + @defer.inlineCallbacks + def test_timeout(self): + """ + test that timeout (set to 2 here) exception is raised + when download times out while downloading first blob, and key fee is paid + """ + getstream = self.init_getstream_with_mocs() + getstream._initialize = types.MethodType(moc_initialize, getstream) + getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + getstream._download = types.MethodType(moc_download, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + name='test' + stream_info = None + start = getstream.start(stream_info,name) + self.clock.advance(1) + self.clock.advance(1) + with self.assertRaises(DownloadTimeoutError): + yield start + self.assertTrue(getstream.downloader.stop_called) + self.assertTrue(getstream.pay_key_fee_called) + + @defer.inlineCallbacks + def test_finish_one_blob(self): + """ + test that if we have 1 completed blob, start() returns + and key fee is paid + """ + getstream = self.init_getstream_with_mocs() + getstream._initialize = types.MethodType(moc_initialize, getstream) + + getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + getstream._download = types.MethodType(moc_download, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + name='test' + stream_info = None + start = getstream.start(stream_info,name) + + getstream.downloader.num_completed = 1 + self.clock.advance(1) + + downloader, f_deferred = yield start + self.assertTrue(getstream.pay_key_fee_called) + + + @defer.inlineCallbacks + def test_finish_stopped_downloader(self): + """ + test that if we have a stopped downloader, beforfe a blob is downloaded, + start() returns + """ + getstream = self.init_getstream_with_mocs() + getstream._initialize = types.MethodType(moc_initialize, getstream) + getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + getstream._download = types.MethodType(moc_download, getstream) + name='test' + stream_info = None + start = getstream.start(stream_info,name) + + getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED + self.clock.advance(1) + downloader, f_deferred = yield start +