diff --git a/CHANGELOG.md b/CHANGELOG.md index b94eda05c..3e815cdf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ at anytime. ### Fixed * Race condition from improper initialization and shutdown of the blob manager database - * + * Various fixes for GetStream class used in API command get ### Deprecated * diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 697aff5ce..1c097f7fb 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -660,17 +660,15 @@ class Daemon(AuthJSONRPCServer): self.analytics_manager.send_download_started(download_id, name, claim_dict) self.streams[claim_id] = GetStream(self.sd_identifier, self.session, - self.session.wallet, self.lbry_file_manager, self.exchange_rate_manager, self.max_key_fee, conf.settings['data_rate'], timeout, download_directory, file_name) try: - download = self.streams[claim_id].start(claim_dict, name) - lbry_file = yield download - f_d = self.streams[claim_id].finished_deferred - f_d.addCallback(lambda _: self.analytics_manager.send_download_finished(download_id, - name, - claim_dict)) + lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) + finished_deferred.addCallback( + lambda _: self.analytics_manager.send_download_finished(download_id, + name, + claim_dict)) result = yield self._get_lbry_file_dict(lbry_file, full_status=True) del self.streams[claim_id] except Exception as err: diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index b69c8914c..bec8a2667 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -1,6 +1,6 @@ import logging import os -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.internet.task import LoopingCall from lbryschema.fee import Fee @@ -8,6 +8,7 @@ from lbryschema.fee import Fee from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory +from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet import conf INITIALIZING_CODE = 'initializing' @@ -38,9 +39,10 @@ def safe_stop(looping_call): class GetStream(object): - def __init__(self, sd_identifier, session, wallet, lbry_file_manager, exchange_rate_manager, + def __init__(self, sd_identifier, session, exchange_rate_manager, max_key_fee, data_rate=None, timeout=None, download_directory=None, file_name=None): + self.timeout = timeout or conf.settings['download_timeout'] self.data_rate = data_rate or conf.settings['data_rate'] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] @@ -49,28 +51,29 @@ class GetStream(object): self.timeout_counter = 0 self.code = None self.sd_hash = None - self.wallet = wallet self.session = session + self.wallet = self.session.wallet self.exchange_rate_manager = exchange_rate_manager self.payment_rate_manager = self.session.payment_rate_manager - self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier self.downloader = None self.checker = LoopingCall(self.check_status) # fired when the download is complete - self.finished_deferred = defer.Deferred(None) + self.finished_deferred = None # fired after the metadata and the first data blob have been downloaded self.data_downloading_deferred = defer.Deferred(None) - self._running = False @property def download_path(self): return os.path.join(self.download_directory, self.downloader.file_name) def _check_status(self, status): - if status.num_completed and not self.data_downloading_deferred.called: + stop_condition = (status.num_completed > 0 or + status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED) + + if stop_condition and not self.data_downloading_deferred.called: self.data_downloading_deferred.callback(True) if self.data_downloading_deferred.called: safe_stop(self.checker) @@ -90,7 +93,7 @@ class GetStream(object): elif self.downloader: d = self.downloader.status() d.addCallback(self._check_status) - elif self._running: + else: log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) def convert_max_fee(self): @@ -161,45 +164,64 @@ class GetStream(object): defer.returnValue(self.download_path) @defer.inlineCallbacks - def download(self, stream_info, name): - if self._running: - raise Exception("Already running") - self._running = True - + def initialize(self, stream_info, name): + # Set sd_hash and return key_fee from stream_info self.set_status(INITIALIZING_CODE, name) self.sd_hash = stream_info.source_hash - + key_fee = None if stream_info.has_fee: - try: - fee = yield threads.deferToThread(self.check_fee_and_convert, - stream_info.source_fee) - except Exception as err: - self._running = False - self.finished_deferred.errback(err) - raise err - else: - fee = None + key_fee = yield self.check_fee_and_convert(stream_info.source_fee) + defer.returnValue(key_fee) - self.set_status(DOWNLOAD_METADATA_CODE, name) - sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager) + @defer.inlineCallbacks + def _create_downloader(self, sd_blob): stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) factory = self.get_downloader_factory(stream_metadata.factories) - self.downloader = yield self.get_downloader(factory, stream_metadata) + downloader = yield self.get_downloader(factory, stream_metadata) + defer.returnValue(downloader) + + @defer.inlineCallbacks + def download(self, name, key_fee): + # download sd blob, and start downloader + self.set_status(DOWNLOAD_METADATA_CODE, name) + sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager) + self.downloader = yield self._create_downloader(sd_blob) self.set_status(DOWNLOAD_RUNNING_CODE, name) - if fee: - yield self.pay_key_fee(fee, name) + if key_fee: + yield self.pay_key_fee(key_fee, name) + log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() self.finished_deferred.addCallback(self.finish, name) @defer.inlineCallbacks def start(self, stream_info, name): + """ + Start download + + Returns: + (tuple) Tuple containing (downloader, finished_deferred) + + downloader - instance of ManagedEncryptedFileDownloader + finished_deferred - deferred callbacked when download is finished + """ + key_fee = yield self.initialize(stream_info, name) + safe_start(self.checker) + try: - safe_start(self.checker) - self.download(stream_info, name) - yield self.data_downloading_deferred - defer.returnValue(self.downloader) + yield self.download(name, key_fee) except Exception as err: safe_stop(self.checker) - raise err + raise + + + try: + yield self.data_downloading_deferred + except Exception as err: + self.downloader.stop() + safe_stop(self.checker) + raise + + defer.returnValue((self.downloader, self.finished_deferred)) + diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py new file mode 100644 index 000000000..e5ab7fff2 --- /dev/null +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -0,0 +1,60 @@ +import types +import mock +import json +from twisted.trial import unittest +from twisted.internet import defer + +from lbryschema.claim import ClaimDict + +from lbrynet.core import Session, PaymentRateManager, Wallet +from lbrynet.lbrynet_daemon import Downloader +from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata +from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.core.HashBlob import TempBlob +from lbrynet.core.BlobManager import TempBlobManager +from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory +from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager + +from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker +from tests.mocks import ExchangeRateManager as DummyExchangeRateManager +from tests.mocks import BTCLBCFeed, USDBTCFeed + + +class GetStreamTests(unittest.TestCase): + + def init_getstream_with_mocs(self): + sd_identifier = mock.Mock(spec=StreamDescriptorIdentifier) + session = mock.Mock(spec=Session.Session) + session.wallet = mock.Mock(spec=Wallet.LBRYumWallet) + prm = mock.Mock(spec=PaymentRateManager.NegotiatedPaymentRateManager) + session.payment_rate_manager = prm + market_feeds = [] + rates={} + exchange_rate_manager = DummyExchangeRateManager(market_feeds, rates) + exchange_rate_manager = mock.Mock(spec=ExchangeRateManager) + max_key_fee = {'currency':"LBC", 'amount':10, 'address':''} + data_rate = {'currency':"LBC", 'amount':0, 'address':''} + download_directory = '.' + + + getstream = Downloader.GetStream(sd_identifier, session, + exchange_rate_manager, max_key_fee, timeout=10, data_rate=data_rate, + download_directory=download_directory) + + return getstream + + @defer.inlineCallbacks + def test_init_exception(self): + """ + test that if initialization would fail, by giving it invaild + stream_info, that an exception is thrown + """ + + getstream = self.init_getstream_with_mocs() + name = 'test' + stream_info = None + + with self.assertRaises(AttributeError): + yield getstream.start(stream_info,name) + +