refactor GetStream

-convert to inline callbacks

-return more errors than just timeouts

-delete on timeout

-have a more understandable return condition (first data blob having
downloaded)
This commit is contained in:
Jack Robison 2017-02-12 20:57:23 -05:00
parent b51d3a28e3
commit 2190f4ac85
3 changed files with 167 additions and 146 deletions

View file

@ -47,7 +47,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
@defer.inlineCallbacks
def restore(self):
sd_hash = yield self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash)
sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
if sd_hash:
self.sd_hash = sd_hash[0]
else:

View file

@ -763,8 +763,12 @@ class Daemon(AuthJSONRPCServer):
"""
timeout = timeout if timeout is not None else conf.settings['download_timeout']
helper = _DownloadNameHelper(
self, name, timeout, download_directory, file_name, wait_for_write)
try:
helper = _DownloadNameHelper(self, name, timeout, download_directory, file_name,
wait_for_write)
except Exception as err:
log.exception(err)
raise err
if not stream_info:
self.waiting_on[name] = True
@ -806,8 +810,7 @@ class Daemon(AuthJSONRPCServer):
timeout=timeout,
download_directory=download_directory,
file_name=file_name)
d = self.streams[name].start(stream_info, name)
return d
return self.streams[name].start(stream_info, name)
def _get_long_count_timestamp(self):
dt = utils.utcnow() - utils.datetime_obj(year=2012, month=12, day=21)
@ -842,7 +845,7 @@ class Daemon(AuthJSONRPCServer):
if stream_count == 0:
yield self.stream_info_manager.delete_stream(stream_hash)
else:
log.warning("Can't delete stream info for %s", stream_hash)
log.warning("Can't delete stream info for %s, count is %i", stream_hash, stream_count)
if delete_file:
if os.path.isfile(filename):
os.remove(filename)
@ -1490,8 +1493,7 @@ class Daemon(AuthJSONRPCServer):
max_tries = 3
while tries <= max_tries:
try:
log.info(
'Making try %s / %s to start download of %s', tries, max_tries, name)
log.info('Making try %s / %s to start download of %s', tries, max_tries, name)
new_sd_hash, file_path = yield self._download_name(
name=name,
timeout=timeout,
@ -1502,10 +1504,10 @@ class Daemon(AuthJSONRPCServer):
)
break
except Exception as e:
log.exception('Failed to get %s', name)
log.warning('Failed to get %s', name)
if tries == max_tries:
self.analytics_manager.send_download_errored(download_id, name, stream_info)
response = yield self._render_response(str(e))
response = yield self._render_response(e.message)
defer.returnValue(response)
tries += 1
# TODO: should stream_hash key be changed to sd_hash?
@ -1515,7 +1517,7 @@ class Daemon(AuthJSONRPCServer):
}
stream = self.streams.get(name)
if stream:
stream.downloader.finished_deferred.addCallback(
stream.finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(
download_id, name, stream_info)
)
@ -2326,9 +2328,7 @@ def get_sd_hash(stream_info):
class _DownloadNameHelper(object):
def __init__(self, daemon, name,
timeout=None,
download_directory=None, file_name=None,
def __init__(self, daemon, name, timeout=None, download_directory=None, file_name=None,
wait_for_write=True):
self.daemon = daemon
self.name = name
@ -2378,16 +2378,24 @@ class _DownloadNameHelper(object):
@defer.inlineCallbacks
def _get_stream(self, stream_info):
was_successful, sd_hash, download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
if not was_successful:
log.warning("lbry://%s timed out, removing from streams", self.name)
try:
download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
except (InsufficientFundsError, Exception) as err:
if Failure(err).check(InsufficientFundsError):
log.warning("Insufficient funds to download lbry://%s", self.name)
self.remove_from_wait("Insufficient funds")
else:
log.warning("lbry://%s timed out, removing from streams", self.name)
self.remove_from_wait("Timed out")
if self.daemon.streams[self.name].downloader is not None:
yield self.daemon._delete_lbry_file(self.daemon.streams[self.name].downloader)
del self.daemon.streams[self.name]
self.remove_from_wait("Timed out")
raise Exception("Timed out")
raise err
if self.wait_for_write:
yield self._wait_for_write()
defer.returnValue((sd_hash, download_path))
defer.returnValue((self.daemon.streams[self.name].sd_hash, download_path))
def _wait_for_write(self):
d = defer.succeed(None)

View file

@ -1,7 +1,5 @@
import logging
import os
from copy import deepcopy
from twisted.internet import defer
from twisted.internet.task import LoopingCall
@ -15,7 +13,6 @@ INITIALIZING_CODE = 'initializing'
DOWNLOAD_METADATA_CODE = 'downloading_metadata'
DOWNLOAD_TIMEOUT_CODE = 'timeout'
DOWNLOAD_RUNNING_CODE = 'running'
# TODO: is this ever used?
DOWNLOAD_STOPPED_CODE = 'stopped'
STREAM_STAGES = [
(INITIALIZING_CODE, 'Initializing'),
@ -29,144 +26,160 @@ STREAM_STAGES = [
log = logging.getLogger(__name__)
def safe_start(looping_call):
if not looping_call.running:
looping_call.start(1)
def safe_stop(looping_call):
if looping_call.running:
looping_call.stop()
class GetStream(object):
def __init__(self, sd_identifier, session, wallet,
lbry_file_manager, exchange_rate_manager,
max_key_fee, data_rate=0.5, timeout=None,
download_directory=None, file_name=None):
if timeout is None:
timeout = conf.settings['download_timeout']
self.wallet = wallet
self.resolved_name = None
self.description = None
self.fee = None
self.data_rate = data_rate
def __init__(self, sd_identifier, session, wallet, lbry_file_manager, exchange_rate_manager,
max_key_fee, data_rate=None, timeout=None, download_directory=None,
file_name=None):
self.timeout = timeout or conf.settings['download_timeout']
self.data_rate = data_rate or conf.settings['data_rate']
self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1]
self.download_directory = download_directory or conf.settings['download_directory']
self.file_name = file_name
self.timeout_counter = 0
self.code = None
self.sd_hash = None
self.wallet = wallet
self.session = session
self.exchange_rate_manager = exchange_rate_manager
self.payment_rate_manager = self.session.payment_rate_manager
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.sd_hash = None
self.max_key_fee = max_key_fee
self.stream_info = None
self.stream_info_manager = None
self._d = defer.Deferred(None)
self.timeout = timeout
self.timeout_counter = 0
self.download_directory = download_directory
self.download_path = None
self.downloader = None
# fired after the metadata has been downloaded and the
# actual file has been started
self.finished = defer.Deferred(None)
self.checker = LoopingCall(self.check_status)
self.code = STREAM_STAGES[0]
# fired when the download is complete
self.finished_deferred = defer.Deferred(None)
# fired after the metadata and the first data blob have been downloaded
self.data_downloading_deferred = defer.Deferred(None)
@property
def download_path(self):
return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status):
if status.num_completed and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called:
safe_stop(self.checker)
else:
log.info("Downloading stream data (%i seconds)", self.timeout_counter)
def check_status(self):
"""
Check if we've got the first data blob in the stream yet
"""
self.timeout_counter += 1
if self.timeout_counter >= self.timeout:
if not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(Exception("Timeout"))
safe_stop(self.checker)
elif self.downloader:
d = self.downloader.status()
d.addCallback(self._check_status)
else:
log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter)
# download_path is set after the sd blob has been downloaded
if self.download_path:
self.checker.stop()
self.finished.callback((True, self.sd_hash, self.download_path))
elif self.timeout_counter >= self.timeout:
log.info("Timeout downloading lbry://%s", self.resolved_name)
self.checker.stop()
self._d.cancel()
self.code = STREAM_STAGES[4]
self.finished.callback((False, None, None))
def _convert_max_fee(self):
def convert_max_fee(self):
max_fee = FeeValidator(self.max_key_fee)
if max_fee.currency_symbol == "LBC":
return max_fee.amount
return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount
def set_status(self, status, name):
log.info("Download lbry://%s status changed to %s" % (name, status))
self.code = next(s for s in STREAM_STAGES if s[0] == status)
def check_fee(self, fee):
validated_fee = FeeValidator(fee)
max_key_fee = self.convert_max_fee()
converted_fee = self.exchange_rate_manager.to_lbc(validated_fee).amount
if converted_fee > self.wallet.get_balance():
raise InsufficientFundsError('Unable to pay the key fee of %s' % converted_fee)
if converted_fee > max_key_fee:
raise KeyFeeAboveMaxAllowed('Key fee %s above max allowed %s' % (converted_fee,
max_key_fee))
return validated_fee
def get_downloader_factory(self, factories):
for factory in factories:
if isinstance(factory, ManagedEncryptedFileDownloaderFactory):
return factory
raise Exception('No suitable factory was found in {}'.format(factories))
@defer.inlineCallbacks
def get_downloader(self, factory, stream_metadata):
downloader_options = [self.data_rate, True]
downloader = yield factory.make_downloader(stream_metadata, downloader_options,
self.payment_rate_manager,
download_directory=self.download_directory,
file_name=self.file_name)
defer.returnValue(downloader)
def _pay_key_fee(self, address, fee_lbc, name):
log.info("Pay key fee %f --> %s", fee_lbc, address)
reserved_points = self.wallet.reserve_points(address, fee_lbc)
if reserved_points is None:
raise InsufficientFundsError('Unable to pay the key fee of %s for %s' % (fee_lbc, name))
return self.wallet.send_points_to_address(reserved_points, fee_lbc)
@defer.inlineCallbacks
def pay_key_fee(self, fee, name):
if fee is not None:
fee_lbc = self.exchange_rate_manager.to_lbc(fee).amount
yield self._pay_key_fee(fee.address, fee_lbc, name)
else:
defer.returnValue(None)
@defer.inlineCallbacks
def finish(self, results, name):
self.set_status(DOWNLOAD_STOPPED_CODE, name)
log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6],
self.download_path)
safe_stop(self.checker)
status = yield self.downloader.status()
self._check_status(status)
defer.returnValue(self.download_path)
@defer.inlineCallbacks
def download(self, stream_info, name):
self.set_status(INITIALIZING_CODE, name)
self.sd_hash = stream_info['sources']['lbry_sd_hash']
if 'fee' in stream_info:
fee = self.check_fee(stream_info['fee'])
else:
fee = None
self.set_status(DOWNLOAD_METADATA_CODE, name)
sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager)
stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob)
factory = self.get_downloader_factory(stream_metadata.factories)
self.downloader = yield self.get_downloader(factory, stream_metadata)
self.set_status(DOWNLOAD_RUNNING_CODE, name)
if fee:
yield self.pay_key_fee(fee, name)
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallback(self.finish, name)
yield self.data_downloading_deferred
@defer.inlineCallbacks
def start(self, stream_info, name):
def _cancel(err):
# this callback sequence gets cancelled in check_status if
# it takes too long when that happens, we want the logic
# to live in check_status
if err.check(defer.CancelledError):
return
if self.checker:
self.checker.stop()
self.finished.errback(err)
def _set_status(x, status):
log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status))
self.code = next(s for s in STREAM_STAGES if s[0] == status)
return x
def get_downloader_factory(metadata):
for factory in metadata.factories:
if isinstance(factory, ManagedEncryptedFileDownloaderFactory):
return factory, metadata
raise Exception('No suitable factory was found in {}'.format(metadata.factories))
def make_downloader(args):
factory, metadata = args
return factory.make_downloader(metadata,
[self.data_rate, True],
self.payment_rate_manager,
download_directory=self.download_directory,
file_name=self.file_name)
self.resolved_name = name
self.stream_info = deepcopy(stream_info)
self.description = self.stream_info['description']
self.sd_hash = self.stream_info['sources']['lbry_sd_hash']
if 'fee' in self.stream_info:
self.fee = FeeValidator(self.stream_info['fee'])
max_key_fee = self._convert_max_fee()
converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount
if converted_fee > self.wallet.get_balance():
msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format(
self.resolved_name, converted_fee, self.wallet.get_balance())
raise InsufficientFundsError(msg)
if converted_fee > max_key_fee:
msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format(
converted_fee, max_key_fee, self.resolved_name)
raise KeyFeeAboveMaxAllowed(msg)
log.info(
"Key fee %f below limit of %f, downloading lbry://%s",
converted_fee, max_key_fee, self.resolved_name)
self.checker.start(1)
self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE))
self._d.addCallback(lambda _: download_sd_blob(
self.session, self.sd_hash, self.payment_rate_manager))
self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
self._d.addCallback(get_downloader_factory)
self._d.addCallback(make_downloader)
self._d.addCallbacks(self._start_download, _cancel)
self._d.callback(None)
return self.finished
def _start_download(self, downloader):
log.info('Starting download for %s', self.resolved_name)
self.downloader = downloader
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d = self._pay_key_fee()
d.addCallback(lambda _: log.info(
"Downloading %s --> %s", self.sd_hash, self.downloader.file_name))
d.addCallback(lambda _: self.downloader.start())
def _pay_key_fee(self):
if self.fee is not None:
fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount
reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc)
if reserved_points is None:
log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name)
# TODO: If we get here, nobody will know that there was an error
# as nobody actually cares about self._d
return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, fee_lbc)
return defer.succeed(None)
try:
safe_start(self.checker)
yield self.download(stream_info, name)
defer.returnValue(self.download_path)
except Exception as err:
safe_stop(self.checker)
raise err