Merge pull request #724 from lbryio/add_timeout_download_sd_blob

Add timeout to download_sd_blob
This commit is contained in:
Umpei Kay Kurokawa 2017-07-05 17:09:50 -04:00 committed by GitHub
commit cd98367039
9 changed files with 212 additions and 125 deletions

View file

@ -17,7 +17,7 @@ at anytime.
*
### Fixed
*
* Fixed timeout behaviour when calling API command get
*
### Deprecated

View file

@ -9,6 +9,10 @@ class DuplicateStreamHashError(Exception):
class DownloadCanceledError(Exception):
pass
class DownloadTimeoutError(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download {} within timeout'.format(download))
self.download = download
class RequestCanceledError(Exception):
pass

View file

@ -236,7 +236,7 @@ class StreamDescriptorIdentifier(object):
return d
def download_sd_blob(session, blob_hash, payment_rate_manager):
def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None):
"""
Downloads a single blob from the network
@ -253,5 +253,6 @@ def download_sd_blob(session, blob_hash, payment_rate_manager):
session.peer_finder,
session.rate_limiter,
payment_rate_manager,
session.wallet)
session.wallet,
timeout)
return downloader.download()

View file

@ -1,6 +1,5 @@
import logging
from twisted.internet import defer
from twisted.python import failure
from zope.interface import implements
from lbrynet import interfaces
@ -29,37 +28,17 @@ class DownloadManager(object):
d.addCallback(lambda _: self.resume_downloading())
return d
@defer.inlineCallbacks
def resume_downloading(self):
yield self.connection_manager.start()
yield self.progress_manager.start()
defer.returnValue(True)
def check_start(result, manager):
if isinstance(result, failure.Failure):
log.error("Failed to start the %s: %s", manager, result.getErrorMessage())
return False
return True
d1 = self.progress_manager.start()
d1.addBoth(check_start, "progress manager")
d2 = self.connection_manager.start()
d2.addBoth(check_start, "connection manager")
dl = defer.DeferredList([d1, d2])
dl.addCallback(lambda xs: False not in xs)
return dl
@defer.inlineCallbacks
def stop_downloading(self):
def check_stop(result, manager):
if isinstance(result, failure.Failure):
log.error("Failed to stop the %s: %s", manager, result.getErrorMessage())
return False
return True
d1 = self.progress_manager.stop()
d1.addBoth(check_stop, "progress manager")
d2 = self.connection_manager.stop()
d2.addBoth(check_stop, "connection manager")
dl = defer.DeferredList([d1, d2], consumeErrors=True)
dl.addCallback(lambda results: all([success for success, val in results]))
return dl
yield self.progress_manager.stop()
yield self.connection_manager.stop()
defer.returnValue(True)
def add_blobs_to_download(self, blob_infos):

View file

@ -5,11 +5,11 @@ from lbrynet.core.BlobInfo import BlobInfo
from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.core.client.DownloadManager import DownloadManager
from lbrynet.core.Error import InvalidBlobHashError
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError
from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call
from twisted.python.failure import Failure
from twisted.internet import defer
from twisted.internet.task import LoopingCall
log = logging.getLogger(__name__)
@ -32,36 +32,35 @@ class SingleBlobMetadataHandler(object):
class SingleProgressManager(object):
def __init__(self, finished_callback, download_manager):
def __init__(self, download_manager, finished_callback, timeout_callback, timeout):
self.finished_callback = finished_callback
self.finished = False
self.timeout_callback = timeout_callback
self.download_manager = download_manager
self._next_check_if_finished = None
self.timeout = timeout
self.timeout_counter = 0
self.checker = LoopingCall(self._check_if_finished)
def start(self):
from twisted.internet import reactor
assert self._next_check_if_finished is None
self._next_check_if_finished = reactor.callLater(0, self._check_if_finished)
safe_start_looping_call(self.checker, 1)
return defer.succeed(True)
def stop(self):
if self._next_check_if_finished is not None:
self._next_check_if_finished.cancel()
self._next_check_if_finished = None
safe_stop_looping_call(self.checker)
return defer.succeed(True)
def _check_if_finished(self):
from twisted.internet import reactor
self._next_check_if_finished = None
if self.finished is False:
if self.stream_position() == 1:
self.blob_downloaded(self.download_manager.blobs[0], 0)
else:
self._next_check_if_finished = reactor.callLater(1, self._check_if_finished)
if self.stream_position() == 1:
blob_downloaded = self.download_manager.blobs[0]
log.debug("The blob %s has been downloaded. Calling the finished callback",
str(blob_downloaded))
safe_stop_looping_call(self.checker)
self.finished_callback(blob_downloaded)
elif self.timeout is not None:
self.timeout_counter += 1
if self.timeout_counter >= self.timeout:
safe_stop_looping_call(self.checker)
self.timeout_callback()
def stream_position(self):
blobs = self.download_manager.blobs
@ -74,15 +73,6 @@ class SingleProgressManager(object):
assert len(blobs) == 1
return [b for b in blobs.itervalues() if not b.is_validated()]
def blob_downloaded(self, blob, blob_num):
from twisted.internet import reactor
log.debug("The blob %s has been downloaded. Calling the finished callback", str(blob))
if self.finished is False:
self.finished = True
reactor.callLater(0, self.finished_callback, blob)
class DummyBlobHandler(object):
def __init__(self):
@ -94,13 +84,15 @@ class DummyBlobHandler(object):
class StandaloneBlobDownloader(object):
def __init__(self, blob_hash, blob_manager, peer_finder,
rate_limiter, payment_rate_manager, wallet):
rate_limiter, payment_rate_manager, wallet,
timeout=None):
self.blob_hash = blob_hash
self.blob_manager = blob_manager
self.peer_finder = peer_finder
self.rate_limiter = rate_limiter
self.payment_rate_manager = payment_rate_manager
self.wallet = wallet
self.timeout = timeout
self.download_manager = None
self.finished_deferred = None
@ -118,8 +110,10 @@ class StandaloneBlobDownloader(object):
self.download_manager)
self.download_manager.blob_info_finder = SingleBlobMetadataHandler(self.blob_hash,
self.download_manager)
self.download_manager.progress_manager = SingleProgressManager(self._blob_downloaded,
self.download_manager)
self.download_manager.progress_manager = SingleProgressManager(self.download_manager,
self._blob_downloaded,
self._download_timedout,
self.timeout)
self.download_manager.blob_handler = DummyBlobHandler()
self.download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger()
self.download_manager.connection_manager = ConnectionManager(
@ -139,6 +133,11 @@ class StandaloneBlobDownloader(object):
if not self.finished_deferred.called:
self.finished_deferred.callback(blob)
def _download_timedout(self):
self.stop()
if not self.finished_deferred.called:
self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash))
def insufficient_funds(self, err):
self.stop()
if not self.finished_deferred.called:

View file

@ -49,6 +49,13 @@ def call_later(delay, func, *args, **kwargs):
from twisted.internet import reactor
return reactor.callLater(delay, func, *args, **kwargs)
def safe_start_looping_call(looping_call, interval_sec):
if not looping_call.running:
looping_call.start(interval_sec)
def safe_stop_looping_call(looping_call):
if looping_call.running:
looping_call.stop()
def generate_id(num=None):
h = get_lbry_hash_obj()

View file

@ -599,26 +599,12 @@ class Daemon(AuthJSONRPCServer):
:param timeout (int): blob timeout
:return: BlobFile
"""
def cb(blob):
if not finished_d.called:
finished_d.callback(blob)
def eb():
if not finished_d.called:
finished_d.errback(Exception("Blob (%s) download timed out" %
blob_hash[:SHORT_ID_LEN]))
if not blob_hash:
raise Exception("Nothing to download")
rate_manager = rate_manager or self.session.payment_rate_manager
timeout = timeout or 30
finished_d = defer.Deferred(None)
reactor.callLater(timeout, eb)
d = download_sd_blob(self.session, blob_hash, rate_manager)
d.addCallback(cb)
return finished_d
return download_sd_blob(self.session, blob_hash, rate_manager, timeout)
@defer.inlineCallbacks
def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None):

View file

@ -5,7 +5,8 @@ from twisted.internet.task import LoopingCall
from lbryschema.fee import Fee
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
@ -28,16 +29,6 @@ STREAM_STAGES = [
log = logging.getLogger(__name__)
def safe_start(looping_call):
if not looping_call.running:
looping_call.start(1)
def safe_stop(looping_call):
if looping_call.running:
looping_call.stop()
class GetStream(object):
def __init__(self, sd_identifier, session, exchange_rate_manager,
max_key_fee, data_rate=None, timeout=None,
@ -72,11 +63,10 @@ class GetStream(object):
def _check_status(self, status):
stop_condition = (status.num_completed > 0 or
status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED)
if stop_condition and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called:
safe_stop(self.checker)
safe_stop_looping_call(self.checker)
else:
log.info("Downloading stream data (%i seconds)", self.timeout_counter)
@ -84,17 +74,14 @@ class GetStream(object):
"""
Check if we've got the first data blob in the stream yet
"""
self.timeout_counter += 1
if self.timeout_counter >= self.timeout:
if not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(Exception("Timeout"))
safe_stop(self.checker)
elif self.downloader:
self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name))
safe_stop_looping_call(self.checker)
else:
d = self.downloader.status()
d.addCallback(self._check_status)
else:
log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter)
def convert_max_fee(self):
currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount']
@ -158,15 +145,14 @@ class GetStream(object):
self.set_status(DOWNLOAD_STOPPED_CODE, name)
log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6],
self.download_path)
safe_stop(self.checker)
safe_stop_looping_call(self.checker)
status = yield self.downloader.status()
self._check_status(status)
defer.returnValue(self.download_path)
@defer.inlineCallbacks
def initialize(self, stream_info, name):
def _initialize(self, stream_info):
# Set sd_hash and return key_fee from stream_info
self.set_status(INITIALIZING_CODE, name)
self.sd_hash = stream_info.source_hash
key_fee = None
if stream_info.has_fee:
@ -181,15 +167,15 @@ class GetStream(object):
defer.returnValue(downloader)
@defer.inlineCallbacks
def download(self, name, key_fee):
# download sd blob, and start downloader
self.set_status(DOWNLOAD_METADATA_CODE, name)
sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager)
self.downloader = yield self._create_downloader(sd_blob)
def _download_sd_blob(self):
sd_blob = yield download_sd_blob(self.session, self.sd_hash,
self.payment_rate_manager, self.timeout)
defer.returnValue(sd_blob)
self.set_status(DOWNLOAD_RUNNING_CODE, name)
if key_fee:
yield self.pay_key_fee(key_fee, name)
@defer.inlineCallbacks
def _download(self, sd_blob, name, key_fee):
self.downloader = yield self._create_downloader(sd_blob)
yield self.pay_key_fee(key_fee, name)
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start()
@ -206,21 +192,21 @@ class GetStream(object):
downloader - instance of ManagedEncryptedFileDownloader
finished_deferred - deferred callbacked when download is finished
"""
key_fee = yield self.initialize(stream_info, name)
safe_start(self.checker)
self.set_status(INITIALIZING_CODE, name)
key_fee = yield self._initialize(stream_info)
try:
yield self.download(name, key_fee)
except Exception as err:
safe_stop(self.checker)
raise
self.set_status(DOWNLOAD_METADATA_CODE, name)
sd_blob = yield self._download_sd_blob()
yield self._download(sd_blob, name, key_fee)
self.set_status(DOWNLOAD_RUNNING_CODE, name)
safe_start_looping_call(self.checker, 1)
try:
yield self.data_downloading_deferred
except Exception as err:
self.downloader.stop()
safe_stop(self.checker)
safe_stop_looping_call(self.checker)
raise
defer.returnValue((self.downloader, self.finished_deferred))

View file

@ -2,17 +2,18 @@ import types
import mock
import json
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet import defer, task
from lbryschema.claim import ClaimDict
from lbrynet.core import Session, PaymentRateManager, Wallet
from lbrynet.core.Error import DownloadTimeoutError
from lbrynet.daemon import Downloader
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.core.HashBlob import TempBlob
from lbrynet.core.BlobManager import TempBlobManager
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader, ManagedEncryptedFileDownloaderFactory
from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
@ -20,10 +21,48 @@ from tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from tests.mocks import BTCLBCFeed, USDBTCFeed
from tests.mocks import mock_conf_settings
class MocDownloader(object):
def __init__(self):
self.finish_deferred = defer.Deferred(None)
self.stop_called = False
self.name = 'test'
self.num_completed = 0
self.num_known = 1
self.running_status = ManagedEncryptedFileDownloader.STATUS_RUNNING
@defer.inlineCallbacks
def status(self):
out = yield EncryptedFileStatusReport(self.name, self.num_completed, self.num_known, self.running_status)
defer.returnValue(out)
def start(self):
return self.finish_deferred
def stop(self):
self.stop_called = True
self.finish_deferred.callback(True)
def moc_initialize(self,stream_info):
self.sd_hash ="d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b"
return None
def moc_download_sd_blob(self):
return None
def moc_download(self, sd_blob, name, key_fee):
self.pay_key_fee(key_fee, name)
self.downloader = MocDownloader()
self.downloader.start()
def moc_pay_key_fee(self, key_fee, name):
self.pay_key_fee_called = True
class GetStreamTests(unittest.TestCase):
def init_getstream_with_mocs(self):
mock_conf_settings(self)
sd_identifier = mock.Mock(spec=StreamDescriptorIdentifier)
session = mock.Mock(spec=Session.Session)
session.wallet = mock.Mock(spec=Wallet.LBRYumWallet)
@ -37,8 +76,11 @@ class GetStreamTests(unittest.TestCase):
data_rate = {'currency':"LBC", 'amount':0, 'address':''}
getstream = Downloader.GetStream(sd_identifier, session,
exchange_rate_manager, max_key_fee, timeout=10, data_rate=data_rate)
exchange_rate_manager, max_key_fee, timeout=3, data_rate=data_rate)
getstream.pay_key_fee_called = False
self.clock = task.Clock()
getstream.checker.clock = self.clock
return getstream
@defer.inlineCallbacks
@ -56,3 +98,86 @@ class GetStreamTests(unittest.TestCase):
yield getstream.start(stream_info,name)
@defer.inlineCallbacks
def test_sd_blob_download_timeout(self):
"""
test that if download_sd_blob fails due to timeout,
DownloadTimeoutError is raised
"""
def download_sd_blob(self):
raise DownloadTimeoutError(self.file_name)
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream)
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream)
name='test'
stream_info = None
with self.assertRaises(DownloadTimeoutError):
yield getstream.start(stream_info,name)
self.assertFalse(getstream.pay_key_fee_called)
@defer.inlineCallbacks
def test_timeout(self):
"""
test that timeout (set to 2 here) exception is raised
when download times out while downloading first blob, and key fee is paid
"""
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream)
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream)
name='test'
stream_info = None
start = getstream.start(stream_info,name)
self.clock.advance(1)
self.clock.advance(1)
with self.assertRaises(DownloadTimeoutError):
yield start
self.assertTrue(getstream.downloader.stop_called)
self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks
def test_finish_one_blob(self):
"""
test that if we have 1 completed blob, start() returns
and key fee is paid
"""
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream)
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream)
name='test'
stream_info = None
start = getstream.start(stream_info,name)
getstream.downloader.num_completed = 1
self.clock.advance(1)
downloader, f_deferred = yield start
self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks
def test_finish_stopped_downloader(self):
"""
test that if we have a stopped downloader, beforfe a blob is downloaded,
start() returns
"""
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream)
name='test'
stream_info = None
start = getstream.start(stream_info,name)
getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED
self.clock.advance(1)
downloader, f_deferred = yield start