Merge pull request #674 from lbryio/fix_getstream

Fix GetStream class
This commit is contained in:
Umpei Kay Kurokawa 2017-06-14 15:15:21 -04:00 committed by GitHub
commit 05e556d066
4 changed files with 121 additions and 41 deletions

View file

@ -19,7 +19,7 @@ at anytime.
### Fixed ### Fixed
* Race condition from improper initialization and shutdown of the blob manager database * Race condition from improper initialization and shutdown of the blob manager database
* * Various fixes for GetStream class used in API command get
### Deprecated ### Deprecated
* *

View file

@ -660,17 +660,15 @@ class Daemon(AuthJSONRPCServer):
self.analytics_manager.send_download_started(download_id, name, claim_dict) self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.streams[claim_id] = GetStream(self.sd_identifier, self.session, self.streams[claim_id] = GetStream(self.sd_identifier, self.session,
self.session.wallet, self.lbry_file_manager,
self.exchange_rate_manager, self.max_key_fee, self.exchange_rate_manager, self.max_key_fee,
conf.settings['data_rate'], timeout, conf.settings['data_rate'], timeout,
download_directory, file_name) download_directory, file_name)
try: try:
download = self.streams[claim_id].start(claim_dict, name) lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name)
lbry_file = yield download finished_deferred.addCallback(
f_d = self.streams[claim_id].finished_deferred lambda _: self.analytics_manager.send_download_finished(download_id,
f_d.addCallback(lambda _: self.analytics_manager.send_download_finished(download_id, name,
name, claim_dict))
claim_dict))
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
del self.streams[claim_id] del self.streams[claim_id]
except Exception as err: except Exception as err:

View file

@ -1,6 +1,6 @@
import logging import logging
import os import os
from twisted.internet import defer, threads from twisted.internet import defer
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from lbryschema.fee import Fee from lbryschema.fee import Fee
@ -8,6 +8,7 @@ from lbryschema.fee import Fee
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet import conf from lbrynet import conf
INITIALIZING_CODE = 'initializing' INITIALIZING_CODE = 'initializing'
@ -38,9 +39,10 @@ def safe_stop(looping_call):
class GetStream(object): class GetStream(object):
def __init__(self, sd_identifier, session, wallet, lbry_file_manager, exchange_rate_manager, def __init__(self, sd_identifier, session, exchange_rate_manager,
max_key_fee, data_rate=None, timeout=None, download_directory=None, max_key_fee, data_rate=None, timeout=None, download_directory=None,
file_name=None): file_name=None):
self.timeout = timeout or conf.settings['download_timeout'] self.timeout = timeout or conf.settings['download_timeout']
self.data_rate = data_rate or conf.settings['data_rate'] self.data_rate = data_rate or conf.settings['data_rate']
self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1]
@ -49,28 +51,29 @@ class GetStream(object):
self.timeout_counter = 0 self.timeout_counter = 0
self.code = None self.code = None
self.sd_hash = None self.sd_hash = None
self.wallet = wallet
self.session = session self.session = session
self.wallet = self.session.wallet
self.exchange_rate_manager = exchange_rate_manager self.exchange_rate_manager = exchange_rate_manager
self.payment_rate_manager = self.session.payment_rate_manager self.payment_rate_manager = self.session.payment_rate_manager
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.downloader = None self.downloader = None
self.checker = LoopingCall(self.check_status) self.checker = LoopingCall(self.check_status)
# fired when the download is complete # fired when the download is complete
self.finished_deferred = defer.Deferred(None) self.finished_deferred = None
# fired after the metadata and the first data blob have been downloaded # fired after the metadata and the first data blob have been downloaded
self.data_downloading_deferred = defer.Deferred(None) self.data_downloading_deferred = defer.Deferred(None)
self._running = False
@property @property
def download_path(self): def download_path(self):
return os.path.join(self.download_directory, self.downloader.file_name) return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status): def _check_status(self, status):
if status.num_completed and not self.data_downloading_deferred.called: stop_condition = (status.num_completed > 0 or
status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED)
if stop_condition and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True) self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called: if self.data_downloading_deferred.called:
safe_stop(self.checker) safe_stop(self.checker)
@ -90,7 +93,7 @@ class GetStream(object):
elif self.downloader: elif self.downloader:
d = self.downloader.status() d = self.downloader.status()
d.addCallback(self._check_status) d.addCallback(self._check_status)
elif self._running: else:
log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter)
def convert_max_fee(self): def convert_max_fee(self):
@ -161,45 +164,64 @@ class GetStream(object):
defer.returnValue(self.download_path) defer.returnValue(self.download_path)
@defer.inlineCallbacks @defer.inlineCallbacks
def download(self, stream_info, name): def initialize(self, stream_info, name):
if self._running: # Set sd_hash and return key_fee from stream_info
raise Exception("Already running")
self._running = True
self.set_status(INITIALIZING_CODE, name) self.set_status(INITIALIZING_CODE, name)
self.sd_hash = stream_info.source_hash self.sd_hash = stream_info.source_hash
key_fee = None
if stream_info.has_fee: if stream_info.has_fee:
try: key_fee = yield self.check_fee_and_convert(stream_info.source_fee)
fee = yield threads.deferToThread(self.check_fee_and_convert, defer.returnValue(key_fee)
stream_info.source_fee)
except Exception as err:
self._running = False
self.finished_deferred.errback(err)
raise err
else:
fee = None
self.set_status(DOWNLOAD_METADATA_CODE, name) @defer.inlineCallbacks
sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager) def _create_downloader(self, sd_blob):
stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob)
factory = self.get_downloader_factory(stream_metadata.factories) factory = self.get_downloader_factory(stream_metadata.factories)
self.downloader = yield self.get_downloader(factory, stream_metadata) downloader = yield self.get_downloader(factory, stream_metadata)
defer.returnValue(downloader)
@defer.inlineCallbacks
def download(self, name, key_fee):
# download sd blob, and start downloader
self.set_status(DOWNLOAD_METADATA_CODE, name)
sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager)
self.downloader = yield self._create_downloader(sd_blob)
self.set_status(DOWNLOAD_RUNNING_CODE, name) self.set_status(DOWNLOAD_RUNNING_CODE, name)
if fee: if key_fee:
yield self.pay_key_fee(fee, name) yield self.pay_key_fee(key_fee, name)
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallback(self.finish, name) self.finished_deferred.addCallback(self.finish, name)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self, stream_info, name): def start(self, stream_info, name):
"""
Start download
Returns:
(tuple) Tuple containing (downloader, finished_deferred)
downloader - instance of ManagedEncryptedFileDownloader
finished_deferred - deferred callbacked when download is finished
"""
key_fee = yield self.initialize(stream_info, name)
safe_start(self.checker)
try: try:
safe_start(self.checker) yield self.download(name, key_fee)
self.download(stream_info, name)
yield self.data_downloading_deferred
defer.returnValue(self.downloader)
except Exception as err: except Exception as err:
safe_stop(self.checker) safe_stop(self.checker)
raise err raise
try:
yield self.data_downloading_deferred
except Exception as err:
self.downloader.stop()
safe_stop(self.checker)
raise
defer.returnValue((self.downloader, self.finished_deferred))

View file

@ -0,0 +1,60 @@
import types
import mock
import json
from twisted.trial import unittest
from twisted.internet import defer
from lbryschema.claim import ClaimDict
from lbrynet.core import Session, PaymentRateManager, Wallet
from lbrynet.lbrynet_daemon import Downloader
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.core.HashBlob import TempBlob
from lbrynet.core.BlobManager import TempBlobManager
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
from tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from tests.mocks import BTCLBCFeed, USDBTCFeed
class GetStreamTests(unittest.TestCase):
def init_getstream_with_mocs(self):
sd_identifier = mock.Mock(spec=StreamDescriptorIdentifier)
session = mock.Mock(spec=Session.Session)
session.wallet = mock.Mock(spec=Wallet.LBRYumWallet)
prm = mock.Mock(spec=PaymentRateManager.NegotiatedPaymentRateManager)
session.payment_rate_manager = prm
market_feeds = []
rates={}
exchange_rate_manager = DummyExchangeRateManager(market_feeds, rates)
exchange_rate_manager = mock.Mock(spec=ExchangeRateManager)
max_key_fee = {'currency':"LBC", 'amount':10, 'address':''}
data_rate = {'currency':"LBC", 'amount':0, 'address':''}
download_directory = '.'
getstream = Downloader.GetStream(sd_identifier, session,
exchange_rate_manager, max_key_fee, timeout=10, data_rate=data_rate,
download_directory=download_directory)
return getstream
@defer.inlineCallbacks
def test_init_exception(self):
"""
test that if initialization would fail, by giving it invaild
stream_info, that an exception is thrown
"""
getstream = self.init_getstream_with_mocs()
name = 'test'
stream_info = None
with self.assertRaises(AttributeError):
yield getstream.start(stream_info,name)