lbry-sdk/lbrynet/lbrynet_daemon/Downloader.py

159 lines
6.8 KiB
Python

import json
import logging
import os
from copy import deepcopy
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.metadata.Fee import FeeValidator
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.conf import settings
INITIALIZING_CODE = 'initializing'
DOWNLOAD_METADATA_CODE = 'downloading_metadata'
DOWNLOAD_TIMEOUT_CODE = 'timeout'
DOWNLOAD_RUNNING_CODE = 'running'
DOWNLOAD_STOPPED_CODE = 'stopped'
STREAM_STAGES = [
(INITIALIZING_CODE, 'Initializing...'),
(DOWNLOAD_METADATA_CODE, 'Downloading metadata'),
(DOWNLOAD_RUNNING_CODE, 'Started stream'),
(DOWNLOAD_STOPPED_CODE, 'Paused stream'),
(DOWNLOAD_TIMEOUT_CODE, 'Stream timed out')
]
log = logging.getLogger(__name__)
class GetStream(object):
def __init__(self, sd_identifier, session, wallet,
lbry_file_manager, exchange_rate_manager,
max_key_fee, data_rate=0.5, timeout=None,
download_directory=None, file_name=None):
if timeout is None:
timeout = settings.download_timeout
self.wallet = wallet
self.resolved_name = None
self.description = None
self.fee = None
self.data_rate = data_rate
self.file_name = file_name
self.session = session
self.exchange_rate_manager = exchange_rate_manager
self.payment_rate_manager = self.session.payment_rate_manager
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.stream_hash = None
self.max_key_fee = max_key_fee
self.stream_info = None
self.stream_info_manager = None
self.d = defer.Deferred(None)
self.timeout = timeout
self.timeout_counter = 0
self.download_directory = download_directory
self.download_path = None
self.downloader = None
self.finished = defer.Deferred(None)
self.checker = LoopingCall(self.check_status)
self.code = STREAM_STAGES[0]
def check_status(self):
self.timeout_counter += 1
# TODO: Why is this the stopping condition for the finished callback?
if self.download_path:
self.checker.stop()
self.finished.callback((True, self.stream_hash, self.download_path))
elif self.timeout_counter >= self.timeout:
log.info("Timeout downloading lbry://%s" % self.resolved_name)
self.checker.stop()
self.d.cancel()
self.code = STREAM_STAGES[4]
self.finished.callback((False, None, None))
def _convert_max_fee(self):
max_fee = FeeValidator(self.max_key_fee)
if max_fee.currency_symbol == "LBC":
return max_fee.amount
return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount
def start(self, stream_info, name):
def _cause_timeout(err):
log.info('Cancelling download')
self.timeout_counter = self.timeout * 2
def _set_status(x, status):
log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status))
self.code = next(s for s in STREAM_STAGES if s[0] == status)
return x
def get_downloader_factory(metadata):
for factory in metadata.factories:
if isinstance(factory, ManagedEncryptedFileDownloaderFactory):
return factory, metadata
raise Exception('No suitable factory was found in {}'.format(metadata.factories))
def make_downloader(args):
factory, metadata = args
return factory.make_downloader(metadata,
[self.data_rate, True],
self.payment_rate_manager,
download_directory=self.download_directory,
file_name=self.file_name)
self.resolved_name = name
self.stream_info = deepcopy(stream_info)
self.description = self.stream_info['description']
self.stream_hash = self.stream_info['sources']['lbry_sd_hash']
if 'fee' in self.stream_info:
self.fee = FeeValidator(self.stream_info['fee'])
max_key_fee = self._convert_max_fee()
converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount
if converted_fee > self.wallet.get_balance():
log.warning("Insufficient funds to download lbry://%s", self.resolved_name)
return defer.fail(InsufficientFundsError())
if converted_fee > max_key_fee:
log.warning("Key fee %f above limit of %f didn't download lbry://%s", converted_fee,
max_key_fee,
self.resolved_name)
return defer.fail(KeyFeeAboveMaxAllowed())
log.info("Key fee %f below limit of %f, downloading lbry://%s", converted_fee,
max_key_fee,
self.resolved_name)
self.checker.start(1)
self.d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE))
self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
self.d.addCallback(get_downloader_factory)
self.d.addCallback(make_downloader)
self.d.addCallbacks(self._start_download, _cause_timeout)
self.d.callback(None)
return self.finished
def _start_download(self, downloader):
log.info('Starting download for %s', self.resolved_name)
self.downloader = downloader
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d = self._pay_key_fee()
d.addCallback(lambda _: log.info("Downloading %s --> %s", self.stream_hash, self.downloader.file_name))
d.addCallback(lambda _: self.downloader.start())
def _pay_key_fee(self):
if self.fee is not None:
fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount
reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, fee_lbc)
return defer.succeed(None)