diff --git a/lbrynet/blob/client/CryptStreamDownloader.py b/lbrynet/blob/client/CryptStreamDownloader.py deleted file mode 100644 index 4e7df06d2..000000000 --- a/lbrynet/blob/client/CryptStreamDownloader.py +++ /dev/null @@ -1,221 +0,0 @@ -import logging -from binascii import unhexlify -from twisted.internet import defer -from twisted.python.failure import Failure - -from lbrynet.conf import Config -from lbrynet.p2p.client.BlobRequester import BlobRequester -from lbrynet.p2p.client.ConnectionManager import ConnectionManager -from lbrynet.p2p.client.DownloadManager import DownloadManager -from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager -from lbrynet.blob.client.CryptBlobHandler import CryptBlobHandler - - -log = logging.getLogger(__name__) - - -class StartFailedError(Exception): - pass - - -class AlreadyRunningError(Exception): - pass - - -class AlreadyStoppedError(Exception): - pass - - -class CurrentlyStoppingError(Exception): - pass - - -class CurrentlyStartingError(Exception): - pass - - -class CryptStreamDownloader: - - #implements(IStreamDownloader) - - def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet, - key, stream_name): - """Initialize a CryptStreamDownloader - - @param peer_finder: An object which implements the IPeerFinder - interface. Used to look up peers by a hashsum. - - @param rate_limiter: An object which implements the IRateLimiter interface - - @param blob_manager: A BlobManager object - - @param payment_rate_manager: A NegotiatedPaymentRateManager object - - @param wallet: An object which implements the IWallet interface - - @return: - - """ - self.conf = conf - self.peer_finder = peer_finder - self.rate_limiter = rate_limiter - self.blob_manager = blob_manager - self.payment_rate_manager = payment_rate_manager - self.wallet = wallet - self.key = unhexlify(key) - self.stream_name = unhexlify(stream_name).decode() - self.completed = False - self.stopped = True - self.stopping = False - self.starting = False - self.download_manager = None - self.finished_deferred = None - self.points_paid = 0.0 - self.blob_requester = None - - def __str__(self): - return str(self.stream_name) - - def toggle_running(self): - if self.stopped is True: - return self.start() - else: - return self.stop() - - def start(self): - if self.starting is True: - raise CurrentlyStartingError() - if self.stopping is True: - raise CurrentlyStoppingError() - if self.stopped is False: - raise AlreadyRunningError() - assert self.download_manager is None - self.starting = True - self.completed = False - self.finished_deferred = defer.Deferred() - d = self._start() - d.addCallback(lambda _: self.finished_deferred) - return d - - @defer.inlineCallbacks - def stop(self, err=None): - if self.stopped is True: - raise AlreadyStoppedError() - if self.stopping is True: - raise CurrentlyStoppingError() - assert self.download_manager is not None - self.stopping = True - success = yield self.download_manager.stop_downloading() - self.stopping = False - if success is True: - self.stopped = True - self._remove_download_manager() - yield self._fire_completed_deferred(err) - - def _start_failed(self): - - def set_stopped(): - self.stopped = True - self.stopping = False - self.starting = False - - if self.download_manager is not None: - d = self.download_manager.stop_downloading() - d.addCallback(lambda _: self._remove_download_manager()) - else: - d = defer.succeed(True) - d.addCallback(lambda _: set_stopped()) - d.addCallback(lambda _: Failure(StartFailedError())) - return d - - def _start(self): - - def check_start_succeeded(success): - if success: - self.starting = False - self.stopped = False - self.completed = False - return True - else: - return self._start_failed() - - self.download_manager = self._get_download_manager() - d = self.download_manager.start_downloading() - d.addCallbacks(check_start_succeeded) - return d - - def _get_download_manager(self): - assert self.blob_requester is None - download_manager = DownloadManager(self.blob_manager) - # TODO: can we get rid of these circular references. I'm not - # smart enough to handle thinking about the interactions - # between them and have hope that there is a simpler way - # to accomplish what we want - download_manager.blob_info_finder = self._get_metadata_handler(download_manager) - download_manager.progress_manager = self._get_progress_manager(download_manager) - download_manager.blob_handler = self._get_blob_handler(download_manager) - download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() - # blob_requester needs to be set before the connection manager is setup - self.blob_requester = self._get_blob_requester(download_manager) - download_manager.connection_manager = self._get_connection_manager(download_manager) - return download_manager - - def _remove_download_manager(self): - self.download_manager.blob_info_finder = None - self.download_manager.progress_manager = None - self.download_manager.blob_handler = None - self.download_manager.wallet_info_exchanger = None - self.blob_requester = None - self.download_manager.connection_manager = None - self.download_manager = None - - def _get_primary_request_creators(self, download_manager): - return [self.blob_requester] - - def _get_secondary_request_creators(self, download_manager): - return [download_manager.wallet_info_exchanger] - - def _get_metadata_handler(self, download_manager): - pass - - def _get_blob_requester(self, download_manager): - return BlobRequester(self.blob_manager, self.peer_finder, - self.payment_rate_manager, self.wallet, - download_manager) - - def _get_progress_manager(self, download_manager): - return FullStreamProgressManager(self._finished_downloading, - self.blob_manager, download_manager) - - def _get_write_func(self): - pass - - def _get_blob_handler(self, download_manager): - return CryptBlobHandler(self.key, self._get_write_func()) - - def _get_connection_manager(self, download_manager): - return ConnectionManager(self, self.rate_limiter, - self._get_primary_request_creators(download_manager), - self._get_secondary_request_creators(download_manager)) - - def _fire_completed_deferred(self, err=None): - self.finished_deferred, d = None, self.finished_deferred - if d is not None: - if err is not None: - d.errback(err) - else: - value = self._get_finished_deferred_callback_value() - d.callback(value) - else: - log.debug("Not firing the completed deferred because d is None") - - def _get_finished_deferred_callback_value(self): - return None - - def _finished_downloading(self, finished): - if finished is True: - self.completed = True - return self.stop() - - def insufficient_funds(self, err): - return self.stop(err=err) diff --git a/lbrynet/blob/client/EncryptedFileDownloader.py b/lbrynet/blob/client/EncryptedFileDownloader.py deleted file mode 100644 index fc57f631d..000000000 --- a/lbrynet/blob/client/EncryptedFileDownloader.py +++ /dev/null @@ -1,199 +0,0 @@ -import os -import logging -import traceback -from binascii import hexlify, unhexlify -from twisted.internet import defer, threads - -from lbrynet.conf import Config -from lbrynet.extras.compat import f2d -from lbrynet.p2p.StreamDescriptor import save_sd_info -from lbrynet.blob.client.CryptStreamDownloader import CryptStreamDownloader -from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager -from lbrynet.p2p.Error import FileOpenError -from lbrynet.blob.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler - - -log = logging.getLogger(__name__) - - -class EncryptedFileDownloader(CryptStreamDownloader): - """Classes which inherit from this class download LBRY files""" - - def __init__(self, conf: Config, stream_hash, peer_finder, rate_limiter, blob_manager, - storage, payment_rate_manager, wallet, key, stream_name, file_name): - super().__init__(conf, peer_finder, rate_limiter, blob_manager, - payment_rate_manager, wallet, key, stream_name) - self.stream_hash = stream_hash - self.storage = storage - self.file_name = os.path.basename(unhexlify(file_name).decode()) - self._calculated_total_bytes = None - - @defer.inlineCallbacks - def delete_data(self): - crypt_infos = yield f2d(self.storage.get_blobs_for_stream(self.stream_hash)) - blob_hashes = [b.blob_hash for b in crypt_infos if b.blob_hash] - sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(self.stream_hash)) - blob_hashes.append(sd_hash) - yield self.blob_manager.delete_blobs(blob_hashes) - - def stop(self, err=None): - self._close_output() - return super().stop(err=err) - - def _get_progress_manager(self, download_manager): - return FullStreamProgressManager(self._finished_downloading, - self.blob_manager, download_manager) - - def _start(self): - def check_start_succeeded(success): - if success: - self.starting = False - self.stopped = False - self.completed = False - return True - else: - return self._start_failed() - - self.download_manager = self._get_download_manager() - d = self._setup_output() - d.addCallback(lambda _: self.download_manager.start_downloading()) - d.addCallbacks(check_start_succeeded) - return d - - def _setup_output(self): - pass - - def _close_output(self): - pass - - async def get_total_bytes(self): - blobs = await self.storage.get_blobs_for_stream(self.stream_hash) - return sum([b.length for b in blobs]) - - def get_total_bytes_cached(self): - if self._calculated_total_bytes is None or self._calculated_total_bytes == 0: - if self.download_manager is None: - return 0 - else: - self._calculated_total_bytes = self.download_manager.calculate_total_bytes() - return self._calculated_total_bytes - - def get_bytes_left_to_output(self): - if self.download_manager is not None: - return self.download_manager.calculate_bytes_left_to_output() - else: - return 0 - - def get_bytes_left_to_download(self): - if self.download_manager is not None: - return self.download_manager.calculate_bytes_left_to_download() - else: - return 0 - - def _get_metadata_handler(self, download_manager): - return EncryptedFileMetadataHandler(self.stream_hash, - self.storage, download_manager) - - -class EncryptedFileDownloaderFactory: - #implements(IStreamDownloaderFactory) - - def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, storage, wallet): - self.conf = conf - self.peer_finder = peer_finder - self.rate_limiter = rate_limiter - self.blob_manager = blob_manager - self.storage = storage - self.wallet = wallet - - def can_download(self, sd_validator): - return True - - def make_downloader(self, metadata, options, payment_rate_manager, **kwargs): - assert len(options) == 1 - data_rate = options[0] - payment_rate_manager.min_blob_data_payment_rate = data_rate - - def save_source_if_blob(stream_hash): - return defer.succeed(metadata.source_blob_hash) - - def create_downloader(stream_hash): - downloader = self._make_downloader(stream_hash, payment_rate_manager, - metadata.validator.raw_info) - return defer.succeed(downloader) - - d = save_sd_info(self.blob_manager, metadata.source_blob_hash, metadata.validator.raw_info) - d.addCallback(save_source_if_blob) - d.addCallback(create_downloader) - return d - - def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): - pass - - -class EncryptedFileSaver(EncryptedFileDownloader): - def __init__(self, conf: Config, stream_hash, peer_finder, rate_limiter, blob_manager, storage, - payment_rate_manager, wallet, download_directory, key, stream_name, file_name): - super().__init__(conf, stream_hash, peer_finder, rate_limiter, - blob_manager, storage, payment_rate_manager, - wallet, key, stream_name, file_name) - self.download_directory = unhexlify(download_directory).decode() - self.file_written_to = os.path.join(self.download_directory, unhexlify(file_name).decode()) - self.file_handle = None - - def __str__(self): - return str(self.file_written_to) - - def _get_progress_manager(self, download_manager): - return FullStreamProgressManager(self._finished_downloading, - self.blob_manager, - download_manager) - - def _setup_output(self): - def open_file(): - if self.file_handle is None: - file_written_to = os.path.join(self.download_directory, self.file_name) - try: - self.file_handle = open(file_written_to, 'wb') - self.file_written_to = file_written_to - except IOError: - log.error(traceback.format_exc()) - raise FileOpenError( - "Failed to open %s. Make sure you have permission to save files to that" - " location." % file_written_to - ) - return threads.deferToThread(open_file) - - def _close_output(self): - self.file_handle, file_handle = None, self.file_handle - if file_handle is not None: - name = file_handle.name - file_handle.close() - if self.completed is False: - os.remove(name) - - def _get_write_func(self): - def write_func(data): - if self.stopped is False and self.file_handle is not None: - self.file_handle.write(data) - return write_func - - -class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory): - def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, storage, wallet, download_directory): - super().__init__(conf, peer_finder, rate_limiter, blob_manager, storage, wallet) - self.download_directory = hexlify(download_directory.encode()) - - def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): - stream_name = stream_info.raw_info['stream_name'] - key = stream_info.raw_info['key'] - suggested_file_name = stream_info.raw_info['suggested_file_name'] - return EncryptedFileSaver( - self.conf, stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.storage, - payment_rate_manager, self.wallet, self.download_directory, key=key, stream_name=stream_name, - file_name=suggested_file_name - ) - - @staticmethod - def get_description(): - return "Save" diff --git a/lbrynet/blob/client/EncryptedFileMetadataHandler.py b/lbrynet/blob/client/EncryptedFileMetadataHandler.py deleted file mode 100644 index e7a7b1c25..000000000 --- a/lbrynet/blob/client/EncryptedFileMetadataHandler.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -from twisted.internet import defer -from lbrynet.extras.compat import f2d - - -log = logging.getLogger(__name__) - - -class EncryptedFileMetadataHandler: - - def __init__(self, stream_hash, storage, download_manager): - self.stream_hash = stream_hash - self.storage = storage - self.download_manager = download_manager - self._final_blob_num = None - - ######### IMetadataHandler ######### - - @defer.inlineCallbacks - def get_initial_blobs(self): - blob_infos = yield f2d(self.storage.get_blobs_for_stream(self.stream_hash)) - return self._format_initial_blobs_for_download_manager(blob_infos) - - def final_blob_num(self): - return self._final_blob_num - - ######### internal calls ######### - - def _format_initial_blobs_for_download_manager(self, blob_infos): - infos = [] - for i, crypt_blob in enumerate(blob_infos): - if crypt_blob.blob_hash is not None and crypt_blob.length: - infos.append(crypt_blob) - else: - if i != len(blob_infos) - 1: - raise Exception("Invalid stream terminator: %i of %i" % - (i, len(blob_infos) - 1)) - log.debug("Setting _final_blob_num to %s", str(crypt_blob.blob_num - 1)) - self._final_blob_num = crypt_blob.blob_num - 1 - return infos diff --git a/lbrynet/blob/client/EncryptedFileOptions.py b/lbrynet/blob/client/EncryptedFileOptions.py deleted file mode 100644 index ca4421d35..000000000 --- a/lbrynet/blob/client/EncryptedFileOptions.py +++ /dev/null @@ -1,47 +0,0 @@ -from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamType -from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamDescriptorValidator -from lbrynet.p2p.DownloadOption import DownloadOption, DownloadOptionChoice - - -def add_lbry_file_to_sd_identifier(sd_identifier): - sd_identifier.add_stream_type(EncryptedFileStreamType, EncryptedFileStreamDescriptorValidator, - EncryptedFileOptions()) - - -class EncryptedFileOptions: - def __init__(self): - pass - - def get_downloader_options(self, sd_validator, payment_rate_manager): - prm = payment_rate_manager - - def get_default_data_rate_description(): - if prm.base.min_blob_data_payment_rate is None: - return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate) - else: - return "%f LBC/MB" % prm.base.min_blob_data_payment_rate - - rate_choices = [] - rate_choices.append(DownloadOptionChoice( - prm.base.min_blob_data_payment_rate, - "No change - %s" % get_default_data_rate_description(), - "No change - %s" % get_default_data_rate_description())) - if prm.base.min_blob_data_payment_rate is not None: - rate_choices.append(DownloadOptionChoice( - None, - "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate), - "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate))) - rate_choices.append(DownloadOptionChoice(float, - "Enter rate in LBC/MB", - "Enter rate in LBC/MB")) - - options = [ - DownloadOption( - rate_choices, - "Rate which will be paid for data", - "data payment rate", - prm.base.min_blob_data_payment_rate, - get_default_data_rate_description() - ), - ] - return options diff --git a/lbrynet/extras/daemon/Downloader.py b/lbrynet/extras/daemon/Downloader.py deleted file mode 100644 index 129cc5f4c..000000000 --- a/lbrynet/extras/daemon/Downloader.py +++ /dev/null @@ -1,209 +0,0 @@ -import logging -import os -from twisted.internet import defer - -from lbrynet.conf import Config -from lbrynet.schema.fee import Fee - -from lbrynet.p2p.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError -from lbrynet.p2p.Error import DownloadDataTimeout, DownloadCanceledError -from lbrynet.p2p.StreamDescriptor import download_sd_blob -from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from torba.client.constants import COIN -from lbrynet.extras.wallet.dewies import dewies_to_lbc -from lbrynet.extras.compat import f2d - -INITIALIZING_CODE = 'initializing' -DOWNLOAD_METADATA_CODE = 'downloading_metadata' -DOWNLOAD_TIMEOUT_CODE = 'timeout' -DOWNLOAD_RUNNING_CODE = 'running' -DOWNLOAD_STOPPED_CODE = 'stopped' -STREAM_STAGES = [ - (INITIALIZING_CODE, 'Initializing'), - (DOWNLOAD_METADATA_CODE, 'Downloading metadata'), - (DOWNLOAD_RUNNING_CODE, 'Started stream'), - (DOWNLOAD_STOPPED_CODE, 'Paused stream'), - (DOWNLOAD_TIMEOUT_CODE, 'Stream timed out') -] - - -log = logging.getLogger(__name__) - - -class GetStream: - def __init__(self, conf: Config, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, - rate_limiter, payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, - timeout=None, reactor=None): - if not reactor: - from twisted.internet import reactor - self.conf = conf - self.reactor = reactor - self.timeout = timeout or conf.download_timeout - self.data_rate = data_rate or conf.data_rate - self.max_key_fee = max_key_fee or conf.max_key_fee - self.disable_max_key_fee = disable_max_key_fee or conf.disable_max_key_fee - self.download_directory = conf.download_dir - self.timeout_counter = 0 - self.code = None - self.sd_hash = None - self.blob_manager = blob_manager - self.peer_finder = peer_finder - self.rate_limiter = rate_limiter - self.wallet = wallet - self.exchange_rate_manager = exchange_rate_manager - self.payment_rate_manager = payment_rate_manager - self.sd_identifier = sd_identifier - self.storage = storage - self.downloader = None - - # fired when the download is complete - self.finished_deferred = None - # fired after the metadata and the first data blob have been downloaded - self.data_downloading_deferred = defer.Deferred(None) - self.wrote_data = False - - @property - def download_path(self): - return os.path.join(self.download_directory, self.downloader.file_name) - - def convert_max_fee(self): - currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] - return self.exchange_rate_manager.convert_currency(currency, "LBC", amount) - - def set_status(self, status, name): - log.info("Download lbry://%s status changed to %s" % (name, status)) - self.code = next(s for s in STREAM_STAGES if s[0] == status) - - @defer.inlineCallbacks - def check_fee_and_convert(self, fee): - max_key_fee_amount = self.convert_max_fee() - converted_fee_amount = self.exchange_rate_manager.convert_currency(fee.currency, "LBC", - fee.amount) - if converted_fee_amount > (yield f2d(self.wallet.default_account.get_balance())): - raise InsufficientFundsError('Unable to pay the key fee of %s' % converted_fee_amount) - if converted_fee_amount > max_key_fee_amount and not self.disable_max_key_fee: - raise KeyFeeAboveMaxAllowed('Key fee {} above max allowed {}'.format(converted_fee_amount, - max_key_fee_amount)) - converted_fee = { - 'currency': 'LBC', - 'amount': converted_fee_amount, - 'address': fee.address - } - return Fee(converted_fee) - - def get_downloader_factory(self, factories): - for factory in factories: - if isinstance(factory, ManagedEncryptedFileDownloaderFactory): - return factory - raise Exception(f'No suitable factory was found in {factories}') - - @defer.inlineCallbacks - def get_downloader(self, factory, stream_metadata, file_name=None): - # TODO: we should use stream_metadata.options.get_downloader_options - # instead of hard-coding the options to be [self.data_rate] - downloader = yield factory.make_downloader( - stream_metadata, - self.data_rate, - self.payment_rate_manager, - self.download_directory, - file_name=file_name - ) - defer.returnValue(downloader) - - def _pay_key_fee(self, address, fee_lbc, name): - log.info("Pay key fee %s --> %s", dewies_to_lbc(fee_lbc), address) - reserved_points = self.wallet.reserve_points(address, fee_lbc) - if reserved_points is None: - raise InsufficientFundsError( - 'Unable to pay the key fee of {} for {}'.format(dewies_to_lbc(fee_lbc), name) - ) - return f2d(self.wallet.send_points_to_address(reserved_points, fee_lbc)) - - @defer.inlineCallbacks - def pay_key_fee(self, fee, name): - if fee is not None: - yield self._pay_key_fee(fee.address.decode(), int(fee.amount * COIN), name) - else: - defer.returnValue(None) - - def finish(self, results, name): - self.set_status(DOWNLOAD_STOPPED_CODE, name) - log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], - self.download_path) - return defer.succeed(self.download_path) - - def fail(self, err): - raise err - - @defer.inlineCallbacks - def _initialize(self, stream_info): - # Set sd_hash and return key_fee from stream_info - self.sd_hash = stream_info.source_hash.decode() - key_fee = None - if stream_info.has_fee: - key_fee = yield self.check_fee_and_convert(stream_info.source_fee) - defer.returnValue(key_fee) - - @defer.inlineCallbacks - def _create_downloader(self, sd_blob, file_name=None): - stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) - factory = self.get_downloader_factory(stream_metadata.factories) - downloader = yield self.get_downloader(factory, stream_metadata, file_name) - defer.returnValue(downloader) - - @defer.inlineCallbacks - def _download_sd_blob(self): - sd_blob = yield download_sd_blob( - self.conf, self.sd_hash, self.blob_manager, self.peer_finder, self.rate_limiter, self.payment_rate_manager, - self.wallet, self.timeout, self.conf.download_mirrors - ) - defer.returnValue(sd_blob) - - @defer.inlineCallbacks - def _download(self, sd_blob, name, key_fee, txid, nout, file_name=None): - self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) - yield self.pay_key_fee(key_fee, name) - yield f2d(self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout))) - self.finished_deferred = self.downloader.start() - self.downloader.download_manager.progress_manager.wrote_first_data.addCallback( - self.data_downloading_deferred.callback - ) - self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) - - @defer.inlineCallbacks - def start(self, stream_info, name, txid, nout, file_name=None): - """ - Start download - - Returns: - (tuple) Tuple containing (downloader, finished_deferred) - - downloader - instance of ManagedEncryptedFileDownloader - finished_deferred - deferred callbacked when download is finished - """ - self.set_status(INITIALIZING_CODE, name) - key_fee = yield self._initialize(stream_info) - self.set_status(DOWNLOAD_METADATA_CODE, name) - try: - sd_blob = yield self._download_sd_blob() - yield self._download(sd_blob, name, key_fee, txid, nout, file_name) - self.set_status(DOWNLOAD_RUNNING_CODE, name) - log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) - self.data_downloading_deferred.addTimeout(self.timeout, self.reactor) - try: - yield self.data_downloading_deferred - self.wrote_data = True - except defer.TimeoutError: - raise DownloadDataTimeout("data download timed out") - except (DownloadDataTimeout, InvalidStreamDescriptorError) as err: - raise err - - defer.returnValue((self.downloader, self.finished_deferred)) - - def cancel(self, reason=None): - if reason: - msg = "download stream cancelled: %s" % reason - else: - msg = "download stream cancelled" - if self.data_downloading_deferred and not self.data_downloading_deferred.called: - self.data_downloading_deferred.errback(DownloadCanceledError(msg)) diff --git a/lbrynet/extras/daemon/Publisher.py b/lbrynet/extras/daemon/Publisher.py deleted file mode 100644 index 01b612086..000000000 --- a/lbrynet/extras/daemon/Publisher.py +++ /dev/null @@ -1,77 +0,0 @@ -import asyncio -import logging -import os - -from lbrynet.blob.EncryptedFileCreator import create_lbry_file -from lbrynet.extras.daemon.mime_types import guess_media_type - -log = logging.getLogger(__name__) - - -def d2f(d): - return d.asFuture(asyncio.get_event_loop()) - - -class Publisher: - def __init__(self, account, blob_manager, payment_rate_manager, storage, - lbry_file_manager, wallet, certificate): - self.account = account - self.blob_manager = blob_manager - self.payment_rate_manager = payment_rate_manager - self.storage = storage - self.lbry_file_manager = lbry_file_manager - self.wallet = wallet - self.certificate = certificate - self.lbry_file = None - - async def create_and_publish_stream(self, name, bid, claim_dict, file_path, holding_address=None): - """Create lbry file and make claim""" - log.info('Starting publish for %s', name) - if not os.path.isfile(file_path): - raise Exception(f"File {file_path} not found") - if os.path.getsize(file_path) == 0: - raise Exception(f"Cannot publish empty file {file_path}") - - file_name = os.path.basename(file_path) - with open(file_path, 'rb') as read_handle: - self.lbry_file = await d2f(create_lbry_file( - self.blob_manager, self.storage, self.payment_rate_manager, self.lbry_file_manager, file_name, - read_handle - )) - - if 'source' not in claim_dict['stream']: - claim_dict['stream']['source'] = {} - claim_dict['stream']['source']['source'] = self.lbry_file.sd_hash - claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash' - claim_dict['stream']['source']['contentType'] = guess_media_type(file_path) - claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here - tx = await self.wallet.claim_name( - self.account, name, bid, claim_dict, self.certificate, holding_address - ) - - # check if we have a file already for this claim (if this is a publish update with a new stream) - old_stream_hashes = await self.storage.get_old_stream_hashes_for_claim_id( - tx.outputs[0].claim_id, self.lbry_file.stream_hash - ) - if old_stream_hashes: - for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes, - list(self.lbry_file_manager.lbry_files)): - await d2f(self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False)) - log.info("Removed old stream for claim update: %s", lbry_file.stream_hash) - - await self.storage.save_content_claim( - self.lbry_file.stream_hash, tx.outputs[0].id - ) - return tx - - async def publish_stream(self, name, bid, claim_dict, stream_hash, holding_address=None): - """Make a claim without creating a lbry file""" - tx = await self.wallet.claim_name( - self.account, name, bid, claim_dict, self.certificate, holding_address - ) - if stream_hash: # the stream_hash returned from the db will be None if this isn't a stream we have - await self.storage.save_content_claim( - stream_hash, tx.outputs[0].id - ) - self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0] - return tx diff --git a/lbrynet/p2p/DownloadOption.py b/lbrynet/p2p/DownloadOption.py deleted file mode 100644 index d256e9be8..000000000 --- a/lbrynet/p2p/DownloadOption.py +++ /dev/null @@ -1,21 +0,0 @@ -class DownloadOptionChoice: - """A possible choice that can be picked for some option. - - An option can have one or more choices that can be picked from. - """ - def __init__(self, value, short_description, long_description, bool_options_description=None): - self.value = value - self.short_description = short_description - self.long_description = long_description - self.bool_options_description = bool_options_description - - -class DownloadOption: - """An option for a user to select a value from several different choices.""" - def __init__(self, option_types, long_description, short_description, default_value, - default_value_description): - self.option_types = option_types - self.long_description = long_description - self.short_description = short_description - self.default_value = default_value - self.default_value_description = default_value_description diff --git a/lbrynet/p2p/HTTPBlobDownloader.py b/lbrynet/p2p/HTTPBlobDownloader.py deleted file mode 100644 index b7941e52a..000000000 --- a/lbrynet/p2p/HTTPBlobDownloader.py +++ /dev/null @@ -1,190 +0,0 @@ -import logging -import treq -from random import choice -from twisted.internet import defer, task -from twisted.internet.error import ConnectingCancelledError -from twisted.web._newclient import ResponseNeverReceived - -from lbrynet.extras.compat import f2d -from lbrynet.p2p.Error import DownloadCanceledError - -log = logging.getLogger(__name__) - - -class HTTPBlobDownloader: - ''' - A downloader that is able to get blobs from HTTP mirrors. - Note that when a blob gets downloaded from a mirror or from a peer, BlobManager will mark it as completed - and cause any other type of downloader to progress to the next missing blob. Also, BlobFile is naturally able - to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between - different types of downloaders. - ''' - def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True, - clock=None): - if not clock: - from twisted.internet import reactor - self.clock = reactor - else: - self.clock = clock - self.blob_manager = blob_manager - self.servers = servers or [] - self.client = client or treq - self.blob_hashes = blob_hashes or [] - self.missing_blob_hashes = [] - self.downloaded_blob_hashes = [] - self.sd_hashes = sd_hashes or [] - self.head_blob_hashes = [] - self.max_failures = 3 - self.semaphore = defer.DeferredSemaphore(2) - self.deferreds = [] - self.writers = [] - self.retry = retry - self.looping_call = task.LoopingCall(self._download_lc) - self.looping_call.clock = self.clock - self.finished_deferred = defer.Deferred() - self.finished_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) - self.short_delay = 30 - self.long_delay = 600 - self.delay = self.short_delay - self.last_missing = 10000000 - self.lc_deferred = None - - @defer.inlineCallbacks - def start(self): - if not self.looping_call.running: - self.lc_deferred = self.looping_call.start(self.short_delay, now=True) - self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) - yield self.finished_deferred - - def stop(self): - for d in reversed(self.deferreds): - d.cancel() - while self.writers: - writer = self.writers.pop() - writer.close(DownloadCanceledError()) - self.blob_hashes = [] - if self.looping_call.running: - self.looping_call.stop() - if self.lc_deferred and not self.lc_deferred.called: - self.lc_deferred.cancel() - if not self.finished_deferred.called: - self.finished_deferred.cancel() - - @defer.inlineCallbacks - def _download_lc(self): - delay = yield self._download_and_get_retry_delay() - log.debug("delay: %s, missing: %i, downloaded from mirror: %i", delay, len(self.missing_blob_hashes), - len(self.downloaded_blob_hashes)) - while self.missing_blob_hashes: - self.blob_hashes.append(self.missing_blob_hashes.pop()) - if not delay: - if self.looping_call.running: - self.looping_call.stop() - if not self.finished_deferred.called: - log.debug("mirror finished") - self.finished_deferred.callback(None) - elif delay and delay != self.delay: - if delay == self.long_delay: - log.debug("mirror not making progress, trying less frequently") - elif delay == self.short_delay: - log.debug("queueing retry of %i blobs", len(self.missing_blob_hashes)) - if self.looping_call.running: - self.looping_call.stop() - self.delay = delay - self.looping_call = task.LoopingCall(self._download_lc) - self.looping_call.clock = self.clock - self.lc_deferred = self.looping_call.start(self.delay, now=False) - self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) - yield self.finished_deferred - - @defer.inlineCallbacks - def _download_and_get_retry_delay(self): - if self.blob_hashes and self.servers: - if self.sd_hashes: - log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8]) - else: - log.debug("trying to download %i blobs from mirror", len(self.blob_hashes)) - blobs = {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} - self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] - yield defer.DeferredList(self.deferreds) - if self.retry and self.missing_blob_hashes: - if not self.downloaded_blob_hashes: - defer.returnValue(self.long_delay) - if len(self.missing_blob_hashes) < self.last_missing: - self.last_missing = len(self.missing_blob_hashes) - defer.returnValue(self.short_delay) - if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing: - defer.returnValue(self.long_delay) - defer.returnValue(None) - - @defer.inlineCallbacks - def _download_blob(self, blob): - for _ in range(self.max_failures): - writer, finished_deferred = blob.open_for_writing('mirror') - self.writers.append(writer) - try: - downloaded = yield self._write_blob(writer, blob) - if downloaded: - yield finished_deferred # yield for verification errors, so we log them - if blob.verified: - log.info('Mirror completed download for %s', blob.blob_hash) - should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes - yield self.blob_manager.blob_completed(blob, should_announce=should_announce) - self.downloaded_blob_hashes.append(blob.blob_hash) - break - except (IOError, Exception, defer.CancelledError, ConnectingCancelledError, ResponseNeverReceived) as e: - if isinstance( - e, (DownloadCanceledError, defer.CancelledError, ConnectingCancelledError, - ResponseNeverReceived) - ) or 'closed file' in str(e): - # some other downloader finished first or it was simply cancelled - log.info("Mirror download cancelled: %s", blob.blob_hash) - break - else: - log.exception('Mirror failed downloading') - finally: - finished_deferred.addBoth(lambda _: None) # suppress echoed errors - if 'mirror' in blob.writers: - writer.close() - self.writers.remove(writer) - - def download_blob(self, blob): - if not blob.verified: - d = self.semaphore.run(self._download_blob, blob) - d.addErrback(lambda err: err.trap(defer.TimeoutError, defer.CancelledError)) - return d - return defer.succeed(None) - - @defer.inlineCallbacks - def _write_blob(self, writer, blob): - response = yield self.client.get(url_for('{}:{}'.format(*choice(self.servers)), blob.blob_hash)) - if response.code != 200: - log.debug('Missing a blob: %s', blob.blob_hash) - if blob.blob_hash in self.blob_hashes: - self.blob_hashes.remove(blob.blob_hash) - if blob.blob_hash not in self.missing_blob_hashes: - self.missing_blob_hashes.append(blob.blob_hash) - defer.returnValue(False) - - log.debug('Download started: %s', blob.blob_hash) - blob.set_length(response.length) - yield self.client.collect(response, writer.write) - defer.returnValue(True) - - @defer.inlineCallbacks - def download_stream(self, stream_hash, sd_hash): - stream_crypt_blobs = yield f2d(self.blob_manager.storage.get_blobs_for_stream(stream_hash)) - self.blob_hashes.extend([ - b.blob_hash for b in stream_crypt_blobs - if b.blob_hash and b.blob_hash not in self.blob_hashes - ]) - if sd_hash not in self.sd_hashes: - self.sd_hashes.append(sd_hash) - head_blob_hash = stream_crypt_blobs[0].blob_hash - if head_blob_hash not in self.head_blob_hashes: - self.head_blob_hashes.append(head_blob_hash) - yield self.start() - - -def url_for(server, blob_hash=''): - return f'http://{server}/{blob_hash}' diff --git a/lbrynet/p2p/Offer.py b/lbrynet/p2p/Offer.py deleted file mode 100644 index 883655ef6..000000000 --- a/lbrynet/p2p/Offer.py +++ /dev/null @@ -1,62 +0,0 @@ -from decimal import Decimal - - -class Offer: - """A rate offer to download blobs from a host.""" - - RATE_ACCEPTED = "RATE_ACCEPTED" - RATE_TOO_LOW = "RATE_TOO_LOW" - RATE_UNSET = "RATE_UNSET" - - def __init__(self, offer): - self._state = None - self.rate = None - if isinstance(offer, Decimal): - self.rate = round(offer, 5) - elif isinstance(offer, float): - self.rate = round(Decimal(offer), 5) - if self.rate is None or self.rate < Decimal(0.0): - self.unset() - - @property - def is_accepted(self): - return self._state is Offer.RATE_ACCEPTED - - @property - def is_too_low(self): - return self._state is Offer.RATE_TOO_LOW - - @property - def is_unset(self): - return self._state is Offer.RATE_UNSET - - @property - def message(self): - if self.is_accepted: - return Offer.RATE_ACCEPTED - elif self.is_too_low: - return Offer.RATE_TOO_LOW - elif self.is_unset: - return Offer.RATE_UNSET - return None - - def accept(self): - if self.is_unset or self._state is None: - self._state = Offer.RATE_ACCEPTED - - def reject(self): - if self.is_unset or self._state is None: - self._state = Offer.RATE_TOO_LOW - - def unset(self): - self._state = Offer.RATE_UNSET - - def handle(self, reply_message): - if reply_message == Offer.RATE_TOO_LOW: - self.reject() - elif reply_message == Offer.RATE_ACCEPTED: - self.accept() - elif reply_message == Offer.RATE_UNSET: - self.unset() - else: - raise Exception("Unknown offer reply %s" % str(reply_message)) diff --git a/lbrynet/p2p/PaymentRateManager.py b/lbrynet/p2p/PaymentRateManager.py deleted file mode 100644 index 86d21c960..000000000 --- a/lbrynet/p2p/PaymentRateManager.py +++ /dev/null @@ -1,126 +0,0 @@ -from decimal import Decimal -from lbrynet.p2p.Strategy import get_default_strategy, OnlyFreeStrategy - - -class BasePaymentRateManager: - def __init__(self, rate, info_rate): - self.min_blob_data_payment_rate = rate - self.min_blob_info_payment_rate = info_rate - - -class PaymentRateManager: - def __init__(self, base, rate=None): - """ - @param base: a BasePaymentRateManager - - @param rate: the min blob data payment rate - """ - self.base = base - self.min_blob_data_payment_rate = rate - self.points_paid = 0.0 - - def get_rate_blob_data(self, peer): - return self.get_effective_min_blob_data_payment_rate() - - def accept_rate_blob_data(self, peer, payment_rate): - return payment_rate >= self.get_effective_min_blob_data_payment_rate() - - def get_effective_min_blob_data_payment_rate(self): - if self.min_blob_data_payment_rate is None: - return self.base.min_blob_data_payment_rate - return self.min_blob_data_payment_rate - - def record_points_paid(self, amount): - self.points_paid += amount - - -class NegotiatedPaymentRateManager: - def __init__(self, base, availability_tracker, generous): - """ - @param base: a BasePaymentRateManager - @param availability_tracker: a BlobAvailabilityTracker - @param rate: the min blob data payment rate - """ - - self.base = base - self.min_blob_data_payment_rate = base.min_blob_data_payment_rate - self.points_paid = 0.0 - self.blob_tracker = availability_tracker - self.generous = generous - self.strategy = get_default_strategy( - self.blob_tracker, self.base.min_blob_data_payment_rate, generous - ) - - def get_rate_blob_data(self, peer, blobs): - response = self.strategy.make_offer(peer, blobs) - return response.rate - - def accept_rate_blob_data(self, peer, blobs, offer): - offer = self.strategy.respond_to_offer(offer, peer, blobs) - self.strategy.update_accepted_offers(peer, offer) - return offer.is_accepted - - def reply_to_offer(self, peer, blobs, offer): - reply = self.strategy.respond_to_offer(offer, peer, blobs) - self.strategy.update_accepted_offers(peer, reply) - return reply - - def get_rate_for_peer(self, peer): - return self.strategy.accepted_offers.get(peer, False) - - def record_points_paid(self, amount): - self.points_paid += amount - - def record_offer_reply(self, peer, offer): - self.strategy.update_accepted_offers(peer, offer) - - def price_limit_reached(self, peer): - if peer in self.strategy.pending_sent_offers: - offer = self.strategy.pending_sent_offers[peer] - return (offer.is_too_low and - round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5)) - return False - - -class OnlyFreePaymentsManager: - def __init__(self, **kwargs): - """ - A payment rate manager that will only ever accept and offer a rate of 0.0, - Used for testing - """ - - self.base = BasePaymentRateManager(0.0, 0.0) - self.points_paid = 0.0 - self.min_blob_data_payment_rate = 0.0 - self.generous = True - self.strategy = OnlyFreeStrategy() - - def get_rate_blob_data(self, peer, blobs): - response = self.strategy.make_offer(peer, blobs) - return response.rate - - def accept_rate_blob_data(self, peer, blobs, offer): - offer = self.strategy.respond_to_offer(offer, peer, blobs) - self.strategy.update_accepted_offers(peer, offer) - return offer.is_accepted - - def reply_to_offer(self, peer, blobs, offer): - reply = self.strategy.respond_to_offer(offer, peer, blobs) - self.strategy.update_accepted_offers(peer, reply) - return reply - - def get_rate_for_peer(self, peer): - return self.strategy.accepted_offers.get(peer, False) - - def record_points_paid(self, amount): - self.points_paid += amount - - def record_offer_reply(self, peer, offer): - self.strategy.update_accepted_offers(peer, offer) - - def price_limit_reached(self, peer): - if peer in self.strategy.pending_sent_offers: - offer = self.strategy.pending_sent_offers[peer] - if offer.rate > 0.0: - return True - return False diff --git a/lbrynet/p2p/StreamDescriptor.py b/lbrynet/p2p/StreamDescriptor.py deleted file mode 100644 index d3144ce92..000000000 --- a/lbrynet/p2p/StreamDescriptor.py +++ /dev/null @@ -1,468 +0,0 @@ -import string -import json -import logging -from collections import defaultdict -from binascii import unhexlify -from twisted.internet import threads, defer - -from lbrynet.extras.compat import f2d -from lbrynet.cryptoutils import get_lbry_hash_obj -from lbrynet.p2p.client.StandaloneBlobDownloader import StandaloneBlobDownloader -from lbrynet.p2p.Error import UnknownStreamTypeError, InvalidStreamDescriptorError -from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader - -log = logging.getLogger(__name__) - - -class JSONBytesEncoder(json.JSONEncoder): - def default(self, obj): # pylint: disable=E0202 - if isinstance(obj, bytes): - return obj.decode() - return super().default(obj) - - -class StreamDescriptorReader: - """Classes which derive from this class read a stream descriptor file return - a dictionary containing the fields in the file""" - def __init__(self): - pass - - def _get_raw_data(self): - """This method must be overridden by subclasses. It should return a deferred - which fires with the raw data in the stream descriptor""" - - def get_info(self): - """Return the fields contained in the file""" - d = self._get_raw_data() - d.addCallback(json.loads) - return d - - -class PlainStreamDescriptorReader(StreamDescriptorReader): - """Read a stream descriptor file which is not a blob but a regular file""" - def __init__(self, stream_descriptor_filename): - super().__init__() - self.stream_descriptor_filename = stream_descriptor_filename - - def _get_raw_data(self): - - def get_data(): - with open(self.stream_descriptor_filename) as file_handle: - raw_data = file_handle.read() - return raw_data - - return threads.deferToThread(get_data) - - -class BlobStreamDescriptorReader(StreamDescriptorReader): - """Read a stream descriptor file which is a blob""" - def __init__(self, blob): - super().__init__() - self.blob = blob - - def _get_raw_data(self): - - def get_data(): - f = self.blob.open_for_reading() - if f is not None: - raw_data = f.read() - f.close() - return raw_data - else: - raise ValueError("Could not open the blob for reading") - - return threads.deferToThread(get_data) - - -class StreamDescriptorWriter: - """Classes which derive from this class write fields from a dictionary - of fields to a stream descriptor""" - def __init__(self): - pass - - def create_descriptor(self, sd_info): - return self._write_stream_descriptor( - json.dumps(sd_info, sort_keys=True).encode() - ) - - def _write_stream_descriptor(self, raw_data): - """This method must be overridden by subclasses to write raw data to - the stream descriptor - """ - - -class PlainStreamDescriptorWriter(StreamDescriptorWriter): - def __init__(self, sd_file_name): - super().__init__() - self.sd_file_name = sd_file_name - - def _write_stream_descriptor(self, raw_data): - - def write_file(): - log.info("Writing the sd file to disk") - with open(self.sd_file_name, 'w') as sd_file: - sd_file.write(raw_data) - return self.sd_file_name - - return threads.deferToThread(write_file) - - -class BlobStreamDescriptorWriter(StreamDescriptorWriter): - def __init__(self, blob_manager): - super().__init__() - self.blob_manager = blob_manager - - @defer.inlineCallbacks - def _write_stream_descriptor(self, raw_data): - log.debug("Creating the new blob for the stream descriptor") - blob_creator = self.blob_manager.get_blob_creator() - blob_creator.write(raw_data) - log.debug("Wrote the data to the new blob") - sd_hash = yield blob_creator.close() - yield self.blob_manager.creator_finished(blob_creator, should_announce=True) - defer.returnValue(sd_hash) - - -class StreamMetadata: - FROM_BLOB = 1 - FROM_PLAIN = 2 - - def __init__(self, validator, options, factories): - self.validator = validator - self.options = options - self.factories = factories - self.metadata_source = None - self.source_blob_hash = None - self.source_file = None - - -class StreamDescriptorIdentifier: - """Tries to determine the type of stream described by the stream descriptor using the - 'stream_type' field. Keeps a list of StreamDescriptorValidators and StreamDownloaderFactorys - and returns the appropriate ones based on the type of the stream descriptor given - """ - def __init__(self): - # {stream_type: IStreamDescriptorValidator} - self._sd_info_validators = {} - # {stream_type: IStreamOptions - self._stream_options = {} - # {stream_type: [IStreamDownloaderFactory]} - self._stream_downloader_factories = defaultdict(list) - - def add_stream_type(self, stream_type, sd_info_validator, stream_options): - """This is how the StreamDescriptorIdentifier learns about new types of stream descriptors. - - There can only be one StreamDescriptorValidator for each type of stream. - - @param stream_type: A string representing the type of stream - descriptor. This must be unique to this stream descriptor. - - @param sd_info_validator: A class implementing the - IStreamDescriptorValidator interface. This class's - constructor will be passed the raw metadata in the stream - descriptor file and its 'validate' method will then be - called. If the validation step fails, an exception will be - thrown, preventing the stream descriptor from being - further processed. - - @param stream_options: A class implementing the IStreamOptions - interface. This class's constructor will be passed the - sd_info_validator object containing the raw metadata from - the stream descriptor file. - - @return: None - - """ - self._sd_info_validators[stream_type] = sd_info_validator - self._stream_options[stream_type] = stream_options - - def add_stream_downloader_factory(self, stream_type, factory): - """Register a stream downloader factory with the StreamDescriptorIdentifier. - - This is how the StreamDescriptorIdentifier determines what - factories may be used to process different stream descriptor - files. There must be at least one factory for each type of - stream added via "add_stream_info_validator". - - @param stream_type: A string representing the type of stream - descriptor which the factory knows how to process. - - @param factory: An object implementing the IStreamDownloaderFactory interface. - - @return: None - - """ - self._stream_downloader_factories[stream_type].append(factory) - - def _return_metadata(self, options_validator_factories, source_type, source): - validator, options, factories = options_validator_factories - m = StreamMetadata(validator, options, factories) - m.metadata_source = source_type - if source_type == StreamMetadata.FROM_BLOB: - m.source_blob_hash = source - if source_type == StreamMetadata.FROM_PLAIN: - m.source_file = source - return m - - def get_metadata_for_sd_file(self, sd_path): - sd_reader = PlainStreamDescriptorReader(sd_path) - d = sd_reader.get_info() - d.addCallback(self._return_options_and_validator_and_factories) - d.addCallback(self._return_metadata, StreamMetadata.FROM_PLAIN, sd_path) - return d - - def get_metadata_for_sd_blob(self, sd_blob): - sd_reader = BlobStreamDescriptorReader(sd_blob) - d = sd_reader.get_info() - d.addCallback(self._return_options_and_validator_and_factories) - d.addCallback(self._return_metadata, StreamMetadata.FROM_BLOB, sd_blob.blob_hash) - return d - - def _get_factories(self, stream_type): - if not stream_type in self._stream_downloader_factories: - raise UnknownStreamTypeError(stream_type) - return self._stream_downloader_factories[stream_type] - - def _get_validator(self, stream_type): - if not stream_type in self._sd_info_validators: - raise UnknownStreamTypeError(stream_type) - return self._sd_info_validators[stream_type] - - def _get_options(self, stream_type): - if not stream_type in self._stream_downloader_factories: - raise UnknownStreamTypeError(stream_type) - return self._stream_options[stream_type] - - def _return_options_and_validator_and_factories(self, sd_info): - if not 'stream_type' in sd_info: - raise InvalidStreamDescriptorError('No stream_type parameter in stream descriptor.') - stream_type = sd_info['stream_type'] - validator = self._get_validator(stream_type)(sd_info) - factories = [f for f in self._get_factories(stream_type) if f.can_download(validator)] - - d = validator.validate() - - def get_options(): - options = self._get_options(stream_type) - return validator, options, factories - - d.addCallback(lambda _: get_options()) - return d - - -EncryptedFileStreamType = "lbryfile" - - -@defer.inlineCallbacks -def save_sd_info(blob_manager, sd_hash, sd_info): - if not blob_manager.blobs.get(sd_hash) or not blob_manager.blobs[sd_hash].get_is_verified(): - descriptor_writer = BlobStreamDescriptorWriter(blob_manager) - calculated_sd_hash = yield descriptor_writer.create_descriptor(sd_info) - if calculated_sd_hash != sd_hash: - raise InvalidStreamDescriptorError("%s does not match calculated %s" % - (sd_hash, calculated_sd_hash)) - stream_hash = yield f2d(blob_manager.storage.get_stream_hash_for_sd_hash(sd_hash)) - if not stream_hash: - log.debug("Saving info for %s", unhexlify(sd_info['stream_name'])) - stream_name = sd_info['stream_name'] - key = sd_info['key'] - stream_hash = sd_info['stream_hash'] - stream_blobs = sd_info['blobs'] - suggested_file_name = sd_info['suggested_file_name'] - yield f2d(blob_manager.storage.add_known_blobs(stream_blobs)) - yield f2d(blob_manager.storage.store_stream( - stream_hash, sd_hash, stream_name, key, suggested_file_name, stream_blobs - )) - defer.returnValue(stream_hash) - - -def format_blobs(crypt_blob_infos): - formatted_blobs = [] - for blob_info in crypt_blob_infos: - blob = {} - if blob_info.length != 0: - blob['blob_hash'] = blob_info.blob_hash - blob['blob_num'] = blob_info.blob_num - blob['iv'] = blob_info.iv - blob['length'] = blob_info.length - formatted_blobs.append(blob) - return formatted_blobs - - -def format_sd_info(stream_type, stream_name, key, suggested_file_name, stream_hash, blobs): - return { - "stream_type": stream_type, - "stream_name": stream_name, - "key": key, - "suggested_file_name": suggested_file_name, - "stream_hash": stream_hash, - "blobs": blobs - } - - -async def get_sd_info(storage, stream_hash, include_blobs): - """ - Get an sd info dictionary from storage - - :param storage: (SQLiteStorage) storage instance - :param stream_hash: (str) stream hash - :param include_blobs: (bool) include stream blob infos - - :return: { - "stream_type": "lbryfile", - "stream_name": , - "key": , - "suggested_file_name": , - "stream_hash": , - "blobs": [ - { - "blob_hash": , - "blob_num": 0, - "iv": , - "length": - }, ... - { - "blob_num": , - "iv": , - "length": 0 - } - ] - } - """ - stream_info = await storage.get_stream_info(stream_hash) - blobs = [] - if include_blobs: - blobs = await storage.get_blobs_for_stream(stream_hash) - return format_sd_info( - EncryptedFileStreamType, stream_info[0], stream_info[1], - stream_info[2], stream_hash, format_blobs(blobs) - ) - - -def get_blob_hashsum(b): - length = b['length'] - if length != 0: - blob_hash = b['blob_hash'] - else: - blob_hash = None - blob_num = b['blob_num'] - iv = b['iv'] - blob_hashsum = get_lbry_hash_obj() - if length != 0: - blob_hashsum.update(blob_hash.encode()) - blob_hashsum.update(str(blob_num).encode()) - blob_hashsum.update(iv.encode()) - blob_hashsum.update(str(length).encode()) - return blob_hashsum.digest() - - -def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos): - h = get_lbry_hash_obj() - h.update(hex_stream_name.encode()) - h.update(key.encode()) - h.update(hex_suggested_file_name.encode()) - blobs_hashsum = get_lbry_hash_obj() - for blob in blob_infos: - blobs_hashsum.update(get_blob_hashsum(blob)) - h.update(blobs_hashsum.digest()) - return h.hexdigest() - - -def verify_hex(text, field_name): - if not set(text).issubset(set(string.hexdigits)): - raise InvalidStreamDescriptorError("%s is not a hex-encoded string" % field_name) - - -def validate_descriptor(stream_info): - try: - hex_stream_name = stream_info['stream_name'] - key = stream_info['key'] - hex_suggested_file_name = stream_info['suggested_file_name'] - stream_hash = stream_info['stream_hash'] - blobs = stream_info['blobs'] - except KeyError as e: - raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0])) - if stream_info['blobs'][-1]['length'] != 0: - raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") - if any([blob_info['length'] == 0 for blob_info in stream_info['blobs'][:-1]]): - raise InvalidStreamDescriptorError("Contains zero-length data blob") - if 'blob_hash' in stream_info['blobs'][-1]: - raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") - - verify_hex(key, "key") - verify_hex(hex_suggested_file_name, "suggested file name") - verify_hex(stream_hash, "stream_hash") - - calculated_stream_hash = get_stream_hash( - hex_stream_name, key, hex_suggested_file_name, blobs - ) - if calculated_stream_hash != stream_hash: - raise InvalidStreamDescriptorError("Stream hash does not match stream metadata") - return True - - -class EncryptedFileStreamDescriptorValidator: - def __init__(self, raw_info): - self.raw_info = raw_info - - def validate(self): - return defer.succeed(validate_descriptor(self.raw_info)) - - def info_to_show(self): - info = [] - info.append(("stream_name", unhexlify(self.raw_info.get("stream_name")))) - size_so_far = 0 - for blob_info in self.raw_info.get("blobs", []): - size_so_far += int(blob_info['length']) - info.append(("stream_size", str(self.get_length_of_stream()))) - suggested_file_name = self.raw_info.get("suggested_file_name", None) - if suggested_file_name is not None: - suggested_file_name = unhexlify(suggested_file_name) - info.append(("suggested_file_name", suggested_file_name)) - return info - - def get_length_of_stream(self): - size_so_far = 0 - for blob_info in self.raw_info.get("blobs", []): - size_so_far += int(blob_info['length']) - return size_so_far - - -@defer.inlineCallbacks -def download_sd_blob(conf, blob_hash, blob_manager, peer_finder, rate_limiter, payment_rate_manager, wallet, - timeout=None, download_mirrors=None): - """ - Downloads a single blob from the network - - @param session: - - @param blob_hash: - - @param payment_rate_manager: - - @return: An object of type HashBlob - """ - - downloader = StandaloneBlobDownloader(conf, - blob_hash, - blob_manager, - peer_finder, - rate_limiter, - payment_rate_manager, - wallet, - timeout) - mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash], retry=False) - mirror.start() - sd_blob = yield downloader.download() - mirror.stop() - sd_reader = BlobStreamDescriptorReader(sd_blob) - sd_info = yield sd_reader.get_info() - try: - validate_descriptor(sd_info) - except InvalidStreamDescriptorError as err: - yield blob_manager.delete_blobs([blob_hash]) - raise err - raw_sd = yield sd_reader._get_raw_data() - yield f2d(blob_manager.storage.add_known_blob(blob_hash, len(raw_sd))) - yield save_sd_info(blob_manager, sd_blob.blob_hash, sd_info) - defer.returnValue(sd_blob) diff --git a/lbrynet/p2p/__init__.py b/lbrynet/p2p/__init__.py deleted file mode 100644 index 6ac1f3432..000000000 --- a/lbrynet/p2p/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Classes and functions which can be used by any application wishing to make use of the LBRY network. - -This includes classes for connecting to other peers and downloading blobs from them, listening for -connections from peers and responding to their requests, managing locally stored blobs, sending -and receiving payments, and locating peers in the DHT. -""" diff --git a/lbrynet/p2p/client/ConnectionManager.py b/lbrynet/p2p/client/ConnectionManager.py deleted file mode 100644 index f54ed8903..000000000 --- a/lbrynet/p2p/client/ConnectionManager.py +++ /dev/null @@ -1,227 +0,0 @@ -import random -import logging -from twisted.internet import defer, reactor -from lbrynet import utils -from lbrynet.conf import Config -from lbrynet.p2p.client.ClientProtocol import ClientProtocolFactory -from lbrynet.p2p.Error import InsufficientFundsError - -log = logging.getLogger(__name__) - - -class PeerConnectionHandler: - def __init__(self, request_creators, factory): - self.request_creators = request_creators - self.factory = factory - self.connection = None - - -class ConnectionManager: - #implements(interfaces.IConnectionManager) - MANAGE_CALL_INTERVAL_SEC = 5 - TCP_CONNECT_TIMEOUT = 15 - - def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators): - - self.conf: Config = downloader.conf - self.seek_head_blob_first = self.conf.seek_head_blob_first - self.max_connections_per_stream = self.conf.max_connections_per_stream - - self.downloader = downloader - self.rate_limiter = rate_limiter - self._primary_request_creators = primary_request_creators - self._secondary_request_creators = secondary_request_creators - self._peer_connections = {} # {Peer: PeerConnectionHandler} - self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)} - self._next_manage_call = None - # a deferred that gets fired when a _manage call is set - self._manage_deferred = None - self.stopped = True - log.debug("%s initialized", self._get_log_name()) - - # this identifies what the connection manager is for, - # used for logging purposes only - def _get_log_name(self): - out = 'Connection Manager Unknown' - if hasattr(self.downloader, 'stream_name'): - out = 'Connection Manager '+self.downloader.stream_name - elif hasattr(self.downloader, 'blob_hash'): - out = 'Connection Manager '+self.downloader.blob_hash - return out - - def _start(self): - self.stopped = False - if self._next_manage_call is not None and self._next_manage_call.active() is True: - self._next_manage_call.cancel() - - def start(self): - log.debug("%s starting", self._get_log_name()) - self._start() - self._next_manage_call = utils.call_later(0, self.manage) - return defer.succeed(True) - - - @defer.inlineCallbacks - def stop(self): - log.debug("%s stopping", self._get_log_name()) - self.stopped = True - # wait for the current manage call to finish - if self._manage_deferred: - yield self._manage_deferred - # in case we stopped between manage calls, cancel the next one - if self._next_manage_call and self._next_manage_call.active(): - self._next_manage_call.cancel() - self._next_manage_call = None - yield self._close_peers() - - def num_peer_connections(self): - return len(self._peer_connections) - - def _close_peers(self): - def disconnect_peer(p): - d = defer.Deferred() - self._connections_closing[p] = d - self._peer_connections[p].connection.disconnect() - if p in self._peer_connections: - del self._peer_connections[p] - return d - - def close_connection(p): - log.debug("%s Abruptly closing a connection to %s due to downloading being paused", - self._get_log_name(), p) - if self._peer_connections[p].factory.p is not None: - d = self._peer_connections[p].factory.p.cancel_requests() - else: - d = defer.succeed(True) - d.addBoth(lambda _: disconnect_peer(p)) - return d - - # fixme: stop modifying dict during iteration - closing_deferreds = [close_connection(peer) for peer in list(self._peer_connections)] - return defer.DeferredList(closing_deferreds) - - @defer.inlineCallbacks - def get_next_request(self, peer, protocol): - log.debug("%s Trying to get the next request for peer %s", self._get_log_name(), peer) - if not peer in self._peer_connections or self.stopped is True: - log.debug("%s The peer %s has already been told to shut down.", - self._get_log_name(), peer) - defer.returnValue(False) - requests = yield self._send_primary_requests(peer, protocol) - have_request = any(r[1] for r in requests if r[0] is True) - if have_request: - yield self._send_secondary_requests(peer, protocol) - defer.returnValue(have_request) - - def _send_primary_requests(self, peer, protocol): - def handle_error(err): - err.trap(InsufficientFundsError) - self.downloader.insufficient_funds(err) - return False - - def check_if_request_sent(request_sent, request_creator): - if peer not in self._peer_connections: - # This can happen if the connection is told to close - return False - if request_sent is False: - if request_creator in self._peer_connections[peer].request_creators: - self._peer_connections[peer].request_creators.remove(request_creator) - else: - if not request_creator in self._peer_connections[peer].request_creators: - self._peer_connections[peer].request_creators.append(request_creator) - return request_sent - - ds = [] - for p_r_c in self._primary_request_creators: - d = p_r_c.send_next_request(peer, protocol) - d.addErrback(handle_error) - d.addCallback(check_if_request_sent, p_r_c) - ds.append(d) - return defer.DeferredList(ds, fireOnOneErrback=True) - - def _send_secondary_requests(self, peer, protocol): - ds = [ - s_r_c.send_next_request(peer, protocol) - for s_r_c in self._secondary_request_creators - ] - return defer.DeferredList(ds) - - @defer.inlineCallbacks - def manage(self, schedule_next_call=True): - self._manage_deferred = defer.Deferred() - if len(self._peer_connections) < self.max_connections_per_stream: - log.debug("%s have %d connections, looking for %d", - self._get_log_name(), len(self._peer_connections), - self.max_connections_per_stream) - peers = yield self._get_new_peers() - for peer in peers: - self._connect_to_peer(peer) - self._manage_deferred.callback(None) - self._manage_deferred = None - if not self.stopped and schedule_next_call: - self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage) - - def return_shuffled_peers_not_connected_to(self, peers, new_conns_needed): - out = [peer for peer in peers if peer not in self._peer_connections] - random.shuffle(out) - return out[0:new_conns_needed] - - @defer.inlineCallbacks - def _get_new_peers(self): - new_conns_needed = self.max_connections_per_stream - len(self._peer_connections) - if new_conns_needed < 1: - defer.returnValue([]) - # we always get the peer from the first request creator - # must be a type BlobRequester... - request_creator = self._primary_request_creators[0] - log.debug("%s Trying to get a new peer to connect to", self._get_log_name()) - - # find peers for the head blob if configured to do so - if self.seek_head_blob_first: - try: - peers = yield request_creator.get_new_peers_for_head_blob() - peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) - except KeyError: - log.warning("%s does not have a head blob", self._get_log_name()) - peers = [] - else: - peers = [] - - # we didn't find any new peers on the head blob, - # we have to look for the first unavailable blob - if not peers: - peers = yield request_creator.get_new_peers_for_next_unavailable() - peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) - - log.debug("%s Got a list of peers to choose from: %s", - self._get_log_name(), peers) - log.debug("%s Current connections: %s", - self._get_log_name(), self._peer_connections.keys()) - log.debug("%s List of connection states: %s", self._get_log_name(), - [p_c_h.connection.state for p_c_h in self._peer_connections.values()]) - defer.returnValue(peers) - - def _connect_to_peer(self, peer): - if self.stopped: - return - - log.debug("%s Trying to connect to %s", self._get_log_name(), peer) - factory = ClientProtocolFactory(peer, self.rate_limiter, self) - factory.connection_was_made_deferred.addCallback( - lambda c_was_made: self._peer_disconnected(c_was_made, peer)) - self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], - factory) - connection = reactor.connectTCP(peer.host, peer.port, factory, - timeout=self.TCP_CONNECT_TIMEOUT) - self._peer_connections[peer].connection = connection - - def _peer_disconnected(self, connection_was_made, peer): - log.debug("%s protocol disconnected for %s", - self._get_log_name(), peer) - if peer in self._peer_connections: - del self._peer_connections[peer] - if peer in self._connections_closing: - d = self._connections_closing[peer] - del self._connections_closing[peer] - d.callback(True) - return connection_was_made diff --git a/lbrynet/p2p/client/DownloadManager.py b/lbrynet/p2p/client/DownloadManager.py deleted file mode 100644 index d73c049d9..000000000 --- a/lbrynet/p2p/client/DownloadManager.py +++ /dev/null @@ -1,83 +0,0 @@ -import logging -from twisted.internet import defer - - -log = logging.getLogger(__name__) - - -class DownloadManager: - #implements(interfaces.IDownloadManager) - - def __init__(self, blob_manager): - self.blob_manager = blob_manager - self.blob_info_finder = None - self.progress_manager = None - self.blob_handler = None - self.connection_manager = None - self.blobs = {} - self.blob_infos = {} - - ######### IDownloadManager ######### - - def start_downloading(self): - d = self.blob_info_finder.get_initial_blobs() - log.debug("Requested the initial blobs from the info finder") - d.addCallback(self.add_blobs_to_download) - d.addCallback(lambda _: self.resume_downloading()) - return d - - @defer.inlineCallbacks - def resume_downloading(self): - yield self.connection_manager.start() - yield self.progress_manager.start() - return True - - @defer.inlineCallbacks - def stop_downloading(self): - yield self.progress_manager.stop() - yield self.connection_manager.stop() - defer.returnValue(True) - - def add_blobs_to_download(self, blob_infos): - log.debug("Adding %s blobs to blobs", len(blob_infos)) - for blob_info in blob_infos: - if not blob_info.blob_num in self.blobs: - self.blob_infos[blob_info.blob_num] = blob_info - log.debug("Trying to get the blob associated with blob hash %s", blob_info.blob_hash) - blob = self.blob_manager.get_blob(blob_info.blob_hash, blob_info.length) - self.blobs[blob_info.blob_num] = blob - log.debug("Added blob (hash: %s, number %s) to the list", blob.blob_hash, blob_info.blob_num) - - def stream_position(self): - return self.progress_manager.stream_position() - - def needed_blobs(self): - return self.progress_manager.needed_blobs() - - def final_blob_num(self): - return self.blob_info_finder.final_blob_num() - - def handle_blob(self, blob_num): - return self.blob_handler.handle_blob(self.blobs[blob_num], self.blob_infos[blob_num]) - - def calculate_total_bytes(self): - return sum([bi.length for bi in self.blob_infos.values()]) - - def calculate_bytes_left_to_output(self): - if not self.blobs: - return self.calculate_total_bytes() - else: - to_be_outputted = [ - b for n, b in self.blobs.items() - if n >= self.progress_manager.last_blob_outputted - ] - return sum([b.length for b in to_be_outputted if b.length is not None]) - - def calculate_bytes_left_to_download(self): - if not self.blobs: - return self.calculate_total_bytes() - else: - return sum([b.length for b in self.needed_blobs() if b.length is not None]) - - def get_head_blob_hash(self): - return self.blobs[0].blob_hash diff --git a/lbrynet/p2p/client/StreamProgressManager.py b/lbrynet/p2p/client/StreamProgressManager.py deleted file mode 100644 index b1c396d0c..000000000 --- a/lbrynet/p2p/client/StreamProgressManager.py +++ /dev/null @@ -1,141 +0,0 @@ -import logging -from twisted.internet import defer, task - - -log = logging.getLogger(__name__) - - -class FullStreamProgressManager: - def __init__(self, finished_callback, blob_manager, download_manager, - delete_blob_after_finished: bool = False, reactor: task.Clock = None): - if not reactor: - from twisted.internet import reactor - self.reactor = reactor - self.finished_callback = finished_callback - self.blob_manager = blob_manager - self.delete_blob_after_finished = delete_blob_after_finished - self.download_manager = download_manager - self.provided_blob_nums = [] - self.last_blob_outputted = -1 - self.stopped = True - self._next_try_to_output_call = None - self.outputting_d = None - self.wrote_first_data = defer.Deferred() - - def start(self): - self.stopped = False - self._next_try_to_output_call = self.reactor.callLater(0, self._try_to_output) - return defer.succeed(True) - - def stop(self): - self.stopped = True - if self._next_try_to_output_call is not None and self._next_try_to_output_call.active(): - self._next_try_to_output_call.cancel() - self._next_try_to_output_call = None - return self._stop_outputting() - - # def blob_downloaded(self, blob, blob_num): - # if self.outputting_d is None: - # self._output_loop() - - def stream_position(self): - blobs = self.download_manager.blobs - if not blobs: - return 0 - else: - for i in range(max(blobs.keys())): - if self._done(i, blobs): - return i - return max(blobs.keys()) + 1 - - def needed_blobs(self): - blobs = self.download_manager.blobs - return [ - b for n, b in blobs.items() - if not b.get_is_verified() and not n in self.provided_blob_nums - ] - - def _finished_outputting(self): - self.finished_callback(True) - - def _try_to_output(self): - self._next_try_to_output_call = self.reactor.callLater(1, self._try_to_output) - if self.outputting_d is None: - self._output_loop() - - def _stop_outputting(self): - if self.outputting_d is not None: - return self.outputting_d - return defer.succeed(None) - - def _finished_with_blob(self, blob_num: int) -> None: - if blob_num == 0 and not self.wrote_first_data.called: - self.wrote_first_data.callback(True) - log.debug("In _finished_with_blob, blob_num = %s", str(blob_num)) - if self.delete_blob_after_finished is True: - log.debug("delete_blob_after_finished is True") - blobs = self.download_manager.blobs - if blob_num in blobs: - log.debug("Telling the blob manager, %s, to delete blob %s", - self.blob_manager, blobs[blob_num].blob_hash) - self.blob_manager.delete_blobs([blobs[blob_num].blob_hash]) - else: - log.debug("Blob number %s was not in blobs", str(blob_num)) - else: - log.debug("delete_blob_after_finished is False") - - def _done(self, i: int, blobs: list) -> bool: - """Return true if `i` is a blob number we don't have""" - return ( - i not in blobs or - ( - not blobs[i].get_is_verified() and - i not in self.provided_blob_nums - ) - ) - - def _output_loop(self): - if self.stopped: - if self.outputting_d is not None: - self.outputting_d.callback(True) - self.outputting_d = None - return - - if self.outputting_d is None: - self.outputting_d = defer.Deferred() - blobs = self.download_manager.blobs - - def finished_outputting_blob(): - self.last_blob_outputted += 1 - - def check_if_finished(): - final_blob_num = self.download_manager.final_blob_num() - if final_blob_num is not None and final_blob_num == self.last_blob_outputted: - self._finished_outputting() - self.outputting_d.callback(True) - self.outputting_d = None - else: - self.reactor.callLater(0, self._output_loop) - - current_blob_num = self.last_blob_outputted + 1 - - if current_blob_num in blobs and blobs[current_blob_num].get_is_verified(): - log.debug("Outputting blob %s", str(self.last_blob_outputted + 1)) - self.provided_blob_nums.append(self.last_blob_outputted + 1) - d = self.download_manager.handle_blob(self.last_blob_outputted + 1) - d.addCallback(lambda _: finished_outputting_blob()) - d.addCallback(lambda _: self._finished_with_blob(current_blob_num)) - d.addCallback(lambda _: check_if_finished()) - - def log_error(err): - log.warning("Error outputting blob %s: %s", blobs[current_blob_num].blob_hash, - err.getErrorMessage()) - if self.outputting_d is not None and not self.outputting_d.called: - self.outputting_d.callback(True) - self.outputting_d = None - self.stop() - - d.addErrback(log_error) - else: - self.outputting_d.callback(True) - self.outputting_d = None diff --git a/lbrynet/stream/__init__.py b/lbrynet/stream/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py new file mode 100644 index 000000000..80bea1154 --- /dev/null +++ b/lbrynet/stream/assembler.py @@ -0,0 +1,101 @@ +import os +import binascii +import logging +import typing +import asyncio +from lbrynet.blob import MAX_BLOB_SIZE +from lbrynet.stream.descriptor import StreamDescriptor +if typing.TYPE_CHECKING: + from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.blob.blob_info import BlobInfo + from lbrynet.blob.blob_file import BlobFile + + +log = logging.getLogger(__name__) + + +def _get_next_available_file_name(download_directory: str, file_name: str) -> str: + base_name, ext = os.path.splitext(file_name) + i = 0 + while os.path.isfile(os.path.join(download_directory, file_name)): + i += 1 + file_name = "%s_%i%s" % (base_name, i, ext) + + return os.path.join(download_directory, file_name) + + +async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str: + return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) + + +class StreamAssembler: + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', sd_hash: str): + self.loop = loop + self.blob_manager = blob_manager + self.sd_hash = sd_hash + self.sd_blob: 'BlobFile' = None + self.descriptor: StreamDescriptor = None + self.got_descriptor = asyncio.Event(loop=self.loop) + self.wrote_bytes_event = asyncio.Event(loop=self.loop) + self.stream_finished_event = asyncio.Event(loop=self.loop) + self.output_path = '' + self.stream_handle = None + self.written_bytes: int = 0 + + async def _decrypt_blob(self, blob: 'BlobFile', blob_info: 'BlobInfo', key: str): + offset = blob_info.blob_num * (MAX_BLOB_SIZE - 1) + + def _decrypt_and_write(): + if self.stream_handle.closed: + return False + self.stream_handle.seek(offset) + _decrypted = blob.decrypt( + binascii.unhexlify(key), binascii.unhexlify(blob_info.iv.encode()) + ) + self.stream_handle.write(_decrypted) + self.stream_handle.flush() + self.written_bytes += len(_decrypted) + return True + + decrypted = await self.loop.run_in_executor(None, _decrypt_and_write) + if decrypted: + log.info("decrypted %s", blob.blob_hash[:8]) + return + + async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None): + if not os.path.isdir(output_dir): + raise OSError(f"output directory does not exist: '{output_dir}' '{output_file_name}'") + self.sd_blob = await self.get_blob(self.sd_hash) + await self.blob_manager.blob_completed(self.sd_blob) + self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir, + self.sd_blob) + if not self.got_descriptor.is_set(): + self.got_descriptor.set() + self.output_path = await get_next_available_file_name(self.loop, output_dir, + output_file_name or self.descriptor.suggested_file_name) + + self.stream_handle = open(self.output_path, 'wb') + await self.blob_manager.storage.store_stream( + self.sd_blob, self.descriptor + ) + try: + for blob_info in self.descriptor.blobs[:-1]: + while True: + try: + blob = await self.get_blob(blob_info.blob_hash, blob_info.length) + await self._decrypt_blob(blob, blob_info, self.descriptor.key) + break + except ValueError as err: + log.error("failed to decrypt blob %s for stream %s - %s", blob_info.blob_hash, + self.descriptor.sd_hash, str(err)) + continue + if not self.wrote_bytes_event.is_set(): + self.wrote_bytes_event.set() + self.stream_finished_event.set() + finally: + self.stream_handle.close() + + async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': + f = asyncio.Future(loop=self.loop) + f.set_result(self.blob_manager.get_blob(blob_hash, length)) + return await f diff --git a/lbrynet/stream/descriptor.py b/lbrynet/stream/descriptor.py new file mode 100644 index 000000000..453852d28 --- /dev/null +++ b/lbrynet/stream/descriptor.py @@ -0,0 +1,186 @@ +import os +import json +import binascii +import logging +import typing +import asyncio +from cryptography.hazmat.primitives.ciphers.algorithms import AES +from lbrynet.blob import MAX_BLOB_SIZE +from lbrynet.blob.blob_info import BlobInfo +from lbrynet.blob.blob_file import BlobFile +from lbrynet.cryptoutils import get_lbry_hash_obj +from lbrynet.error import InvalidStreamDescriptorError + +log = logging.getLogger(__name__) + + +def format_sd_info(stream_name: str, key: str, suggested_file_name: str, stream_hash: str, + blobs: typing.List[typing.Dict]) -> typing.Dict: + return { + "stream_type": "lbryfile", + "stream_name": stream_name, + "key": key, + "suggested_file_name": suggested_file_name, + "stream_hash": stream_hash, + "blobs": blobs + } + + +def random_iv_generator() -> typing.Generator[bytes, None, None]: + while 1: + yield os.urandom(AES.block_size // 8) + + +def file_reader(file_path: str): + length = int(os.stat(file_path).st_size) + offset = 0 + + with open(file_path, 'rb') as stream_file: + while offset < length: + bytes_to_read = min((length - offset), MAX_BLOB_SIZE - 1) + if not bytes_to_read: + break + blob_bytes = stream_file.read(bytes_to_read) + yield blob_bytes + offset += bytes_to_read + + +class StreamDescriptor: + def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, stream_name: str, key: str, + suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None, + sd_hash: typing.Optional[str] = None): + self.loop = loop + self.blob_dir = blob_dir + self.stream_name = stream_name + self.key = key + self.suggested_file_name = suggested_file_name + self.blobs = blobs + self.stream_hash = stream_hash or self.get_stream_hash() + self.sd_hash = sd_hash + + def get_stream_hash(self) -> str: + return self.calculate_stream_hash( + binascii.hexlify(self.stream_name.encode()), self.key.encode(), + binascii.hexlify(self.suggested_file_name.encode()), + [blob_info.as_dict() for blob_info in self.blobs] + ) + + def calculate_sd_hash(self) -> str: + h = get_lbry_hash_obj() + h.update(self.as_json()) + return h.hexdigest() + + def as_json(self) -> bytes: + return json.dumps( + format_sd_info(binascii.hexlify(self.stream_name.encode()).decode(), self.key, + binascii.hexlify(self.suggested_file_name.encode()).decode(), + self.stream_hash, + [blob_info.as_dict() for blob_info in self.blobs]), sort_keys=True + ).encode() + + async def make_sd_blob(self): + sd_hash = self.calculate_sd_hash() + sd_data = self.as_json() + sd_blob = BlobFile(self.loop, self.blob_dir, sd_hash, len(sd_data)) + if not sd_blob.get_is_verified(): + writer = sd_blob.open_for_writing() + writer.write(sd_data) + await sd_blob.verified.wait() + await sd_blob.close() + return sd_blob + + @classmethod + def _from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str, + blob: BlobFile) -> 'StreamDescriptor': + assert os.path.isfile(blob.file_path) + with open(blob.file_path, 'rb') as f: + json_bytes = f.read() + decoded = json.loads(json_bytes.decode()) + if decoded['blobs'][-1]['length'] != 0: + raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") + if any([blob_info['length'] == 0 for blob_info in decoded['blobs'][:-1]]): + raise InvalidStreamDescriptorError("Contains zero-length data blob") + if 'blob_hash' in decoded['blobs'][-1]: + raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") + descriptor = cls( + loop, blob_dir, + binascii.unhexlify(decoded['stream_name']).decode(), + decoded['key'], + binascii.unhexlify(decoded['suggested_file_name']).decode(), + [BlobInfo(info['blob_num'], info['length'], info['iv'], info.get('blob_hash')) + for info in decoded['blobs']], + decoded['stream_hash'], + blob.blob_hash + ) + if descriptor.get_stream_hash() != decoded['stream_hash']: + raise InvalidStreamDescriptorError("Stream hash does not match stream metadata") + return descriptor + + @classmethod + async def from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str, + blob: BlobFile) -> 'StreamDescriptor': + return await loop.run_in_executor(None, lambda: cls._from_stream_descriptor_blob(loop, blob_dir, blob)) + + @staticmethod + def get_blob_hashsum(b: typing.Dict): + length = b['length'] + if length != 0: + blob_hash = b['blob_hash'] + else: + blob_hash = None + blob_num = b['blob_num'] + iv = b['iv'] + blob_hashsum = get_lbry_hash_obj() + if length != 0: + blob_hashsum.update(blob_hash.encode()) + blob_hashsum.update(str(blob_num).encode()) + blob_hashsum.update(iv.encode()) + blob_hashsum.update(str(length).encode()) + return blob_hashsum.digest() + + @staticmethod + def calculate_stream_hash(hex_stream_name: bytes, key: bytes, hex_suggested_file_name: bytes, + blob_infos: typing.List[typing.Dict]) -> str: + h = get_lbry_hash_obj() + h.update(hex_stream_name) + h.update(key) + h.update(hex_suggested_file_name) + blobs_hashsum = get_lbry_hash_obj() + for blob in blob_infos: + blobs_hashsum.update(StreamDescriptor.get_blob_hashsum(blob)) + h.update(blobs_hashsum.digest()) + return h.hexdigest() + + @classmethod + async def create_stream(cls, loop: asyncio.BaseEventLoop, blob_dir: str, + file_path: str, key: typing.Optional[bytes] = None, + iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None + ) -> 'StreamDescriptor': + + blobs: typing.List[BlobInfo] = [] + + iv_generator = iv_generator or random_iv_generator() + key = key or os.urandom(AES.block_size // 8) + blob_num = -1 + for blob_bytes in file_reader(file_path): + blob_num += 1 + blob_info = await BlobFile.create_from_unencrypted( + loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num + ) + blobs.append(blob_info) + blobs.append( + BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode())) # add the stream terminator + descriptor = cls( + loop, blob_dir, os.path.basename(file_path), binascii.hexlify(key).decode(), os.path.basename(file_path), + blobs + ) + sd_blob = await descriptor.make_sd_blob() + descriptor.sd_hash = sd_blob.blob_hash + return descriptor + + def lower_bound_decrypted_length(self) -> int: + length = sum((blob.length - 1 for blob in self.blobs[:-2])) + return length + self.blobs[-2].length - (AES.block_size // 8) + + def upper_bound_decrypted_length(self) -> int: + return self.lower_bound_decrypted_length() + (AES.block_size // 8) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py new file mode 100644 index 000000000..cc8b7439a --- /dev/null +++ b/lbrynet/stream/downloader.py @@ -0,0 +1,235 @@ +import os +import asyncio +import typing +import logging +from lbrynet import conf +from lbrynet.utils import drain_tasks, cancel_task +from lbrynet.stream.assembler import StreamAssembler +from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob +if typing.TYPE_CHECKING: + from lbrynet.dht.node import Node + from lbrynet.dht.peer import KademliaPeer + from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.blob.blob_file import BlobFile + +log = logging.getLogger(__name__) + + +def drain_into(a: list, b: list): + while a: + b.append(a.pop()) + + +class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor to inherit BlobDownloader + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', sd_hash: str, + peer_timeout: float, peer_connect_timeout: float, output_dir: typing.Optional[str] = None, + output_file_name: typing.Optional[str] = None, + fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None): + super().__init__(loop, blob_manager, sd_hash) + self.peer_timeout = peer_timeout + self.peer_connect_timeout = peer_connect_timeout + self.current_blob: 'BlobFile' = None + + self.download_task: asyncio.Task = None + self.accumulate_connections_task: asyncio.Task = None + self.new_peer_event = asyncio.Event(loop=self.loop) + self.active_connections: typing.Dict['KademliaPeer', BlobExchangeClientProtocol] = {} + self.running_download_requests: typing.List[asyncio.Task] = [] + self.requested_from: typing.Dict[str, typing.Dict['KademliaPeer', asyncio.Task]] = {} + self.output_dir = output_dir or os.getcwd() + self.output_file_name = output_file_name + self._lock = asyncio.Lock(loop=self.loop) + self.max_connections_per_stream = 8 if not conf.settings else conf.settings['max_connections_per_stream'] + self.fixed_peers = fixed_peers or [] + + async def _update_current_blob(self, blob: 'BlobFile'): + async with self._lock: + drain_tasks(self.running_download_requests) + self.current_blob = blob + if not blob.get_is_verified(): + self._update_requests() + + async def _request_blob(self, peer: 'KademliaPeer'): + if self.current_blob.get_is_verified(): + log.info("already verified") + return + if peer not in self.active_connections: + log.warning("not active, adding: %s", str(peer)) + self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.peer_timeout) + protocol = self.active_connections[peer] + success, keep_connection = await request_blob(self.loop, self.current_blob, protocol, + peer.address, peer.tcp_port, self.peer_connect_timeout) + await protocol.close() + if not keep_connection: + log.info("drop peer %s:%i", peer.address, peer.tcp_port) + if peer in self.active_connections: + async with self._lock: + del self.active_connections[peer] + return + log.info("keep peer %s:%i", peer.address, peer.tcp_port) + + def _update_requests(self): + self.new_peer_event.clear() + if self.current_blob.blob_hash not in self.requested_from: + self.requested_from[self.current_blob.blob_hash] = {} + to_add = [] + for peer in self.active_connections.keys(): + if peer not in self.requested_from[self.current_blob.blob_hash] and peer not in to_add: + to_add.append(peer) + if to_add or self.running_download_requests: + log.info("adding download probes for %i peers to %i already active", + min(len(to_add), 8 - len(self.running_download_requests)), + len(self.running_download_requests)) + else: + log.info("downloader idle...") + for peer in to_add: + if len(self.running_download_requests) >= 8: + break + task = self.loop.create_task(self._request_blob(peer)) + self.requested_from[self.current_blob.blob_hash][peer] = task + self.running_download_requests.append(task) + + async def wait_for_download_or_new_peer(self) -> typing.Optional['BlobFile']: + async with self._lock: + if len(self.running_download_requests) < self.max_connections_per_stream: + # update the running download requests + self._update_requests() + + # drain the tasks into a temporary list + download_tasks = [] + drain_into(self.running_download_requests, download_tasks) + + got_new_peer = self.loop.create_task(self.new_peer_event.wait()) + + # wait for a new peer to be added or for a download attempt to finish + await asyncio.wait([got_new_peer] + download_tasks, return_when='FIRST_COMPLETED', + loop=self.loop) + if got_new_peer and not got_new_peer.done(): + got_new_peer.cancel() + async with self._lock: + if self.current_blob.get_is_verified(): + if got_new_peer and not got_new_peer.done(): + got_new_peer.cancel() + drain_tasks(download_tasks) + return self.current_blob + else: + for task in download_tasks: + if task and not task.done(): + self.running_download_requests.append(task) + return + + async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': + blob = self.blob_manager.get_blob(blob_hash, length) + await self._update_current_blob(blob) + if blob.get_is_verified(): + return blob + + # the blob must be downloaded + try: + while not self.current_blob.get_is_verified(): + if not self.active_connections: # wait for a new connection + await self.new_peer_event.wait() + continue + blob = await self.wait_for_download_or_new_peer() + if blob: + drain_tasks(self.running_download_requests) + return blob + return blob + except asyncio.CancelledError: + drain_tasks(self.running_download_requests) + raise + + def _add_peer_protocols(self, peers: typing.List['KademliaPeer']): + added = 0 + for peer in peers: + if peer not in self.active_connections: + self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.peer_timeout) + added += 1 + if added: + if not self.new_peer_event.is_set(): + log.info("added %i new peers", len(peers)) + self.new_peer_event.set() + + async def _accumulate_connections(self, node: 'Node'): + blob_queue = asyncio.Queue(loop=self.loop) + blob_queue.put_nowait(self.sd_hash) + task = asyncio.create_task(self.got_descriptor.wait()) + added_peers = asyncio.Event(loop=self.loop) + add_fixed_peers_timer: typing.Optional[asyncio.Handle] = None + + if self.fixed_peers: + def check_added_peers(): + if not added_peers.is_set(): + self._add_peer_protocols(self.fixed_peers) + log.info("no dht peers for download yet, adding fixed peer") + added_peers.set() + + add_fixed_peers_timer = self.loop.call_later(2, check_added_peers) + + def got_descriptor(f): + try: + f.result() + except asyncio.CancelledError: + return + log.info("add head blob hash to peer search") + blob_queue.put_nowait(self.descriptor.blobs[0].blob_hash) + + task.add_done_callback(got_descriptor) + try: + async with node.stream_peer_search_junction(blob_queue) as search_junction: + async for peers in search_junction: + if not isinstance(peers, list): # TODO: what's up with this? + log.error("not a list: %s %s", peers, str(type(peers))) + else: + self._add_peer_protocols(peers) + if not added_peers.is_set(): + added_peers.set() + return + except asyncio.CancelledError: + pass + finally: + if task and not task.done(): + task.cancel() + log.info("cancelled head blob task") + if add_fixed_peers_timer and not add_fixed_peers_timer.cancelled(): + add_fixed_peers_timer.cancel() + + async def stop(self): + cancel_task(self.accumulate_connections_task) + self.accumulate_connections_task = None + drain_tasks(self.running_download_requests) + + while self.requested_from: + _, peer_task_dict = self.requested_from.popitem() + while peer_task_dict: + peer, task = peer_task_dict.popitem() + try: + cancel_task(task) + except asyncio.CancelledError: + pass + + while self.active_connections: + _, client = self.active_connections.popitem() + if client: + await client.close() + log.info("stopped downloader") + + async def _download(self): + try: + + log.info("download and decrypt stream") + await self.assemble_decrypted_stream(self.output_dir, self.output_file_name) + log.info( + "downloaded stream %s -> %s", self.sd_hash, self.output_path + ) + await self.blob_manager.storage.change_file_status( + self.descriptor.stream_hash, 'finished' + ) + except asyncio.CancelledError: + pass + finally: + await self.stop() + + def download(self, node: 'Node'): + self.accumulate_connections_task = self.loop.create_task(self._accumulate_connections(node)) + self.download_task = self.loop.create_task(self._download()) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py new file mode 100644 index 000000000..617aabe72 --- /dev/null +++ b/lbrynet/stream/managed_stream.py @@ -0,0 +1,151 @@ +import os +import asyncio +import typing +import logging +from lbrynet.extras.daemon.mime_types import guess_media_type +from lbrynet.stream.downloader import StreamDownloader +if typing.TYPE_CHECKING: + from lbrynet.extras.daemon.storage import StoredStreamClaim + from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.stream.descriptor import StreamDescriptor + +log = logging.getLogger(__name__) + + +class ManagedStream: + STATUS_RUNNING = "running" + STATUS_STOPPED = "stopped" + STATUS_FINISHED = "finished" + + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', descriptor: 'StreamDescriptor', + download_directory: str, file_name: str, downloader: typing.Optional[StreamDownloader] = None, + status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional['StoredStreamClaim'] = None): + self.loop = loop + self.blob_manager = blob_manager + self.download_directory = download_directory + self.file_name = file_name + self.descriptor = descriptor + self.downloader = downloader + self.stream_hash = descriptor.stream_hash + self.stream_claim_info = claim + self._status = status + self._store_after_finished: asyncio.Task = None + + @property + def status(self) -> str: + return self._status + + def update_status(self, status: str): + assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] + self._status = status + + @property + def finished(self) -> bool: + return self.status == self.STATUS_FINISHED + + @property + def running(self) -> bool: + return self.status == self.STATUS_RUNNING + + @property + def claim_id(self) -> typing.Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.claim_id + + @property + def txid(self) -> typing.Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.txid + + @property + def nout(self) -> typing.Optional[int]: + return None if not self.stream_claim_info else self.stream_claim_info.nout + + @property + def outpoint(self) -> typing.Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.outpoint + + @property + def claim_height(self) -> typing.Optional[int]: + return None if not self.stream_claim_info else self.stream_claim_info.height + + @property + def channel_claim_id(self) -> typing.Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id + + @property + def channel_name(self) -> typing.Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.channel_name + + @property + def claim_name(self) -> typing.Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.claim_name + + @property + def metadata(self) ->typing.Optional[typing.Dict]: + return None if not self.stream_claim_info else self.stream_claim_info.claim.claim_dict['stream']['metadata'] + + @property + def blobs_completed(self) -> int: + return sum([1 if self.blob_manager.get_blob(b.blob_hash).get_is_verified() else 0 + for b in self.descriptor.blobs[:-1]]) + + @property + def blobs_in_stream(self) -> int: + return len(self.descriptor.blobs) - 1 + + @property + def sd_hash(self): + return self.descriptor.sd_hash + + def as_dict(self) -> typing.Dict: + full_path = os.path.join(self.download_directory, self.file_name) + if not os.path.exists(full_path): + full_path = None + mime_type = guess_media_type(os.path.basename(self.file_name)) + return { + 'completed': self.finished, + 'file_name': self.file_name, + 'download_directory': self.download_directory, + 'points_paid': 0.0, + 'stopped': not self.running, + 'stream_hash': self.stream_hash, + 'stream_name': self.descriptor.stream_name, + 'suggested_file_name': self.descriptor.suggested_file_name, + 'sd_hash': self.descriptor.sd_hash, + 'download_path': full_path, + 'mime_type': mime_type, + 'key': self.descriptor.key, + 'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(), + 'total_bytes': self.descriptor.upper_bound_decrypted_length(), + 'written_bytes': None if not full_path else self.downloader.written_bytes or os.stat(full_path).st_size, + 'blobs_completed': self.blobs_completed, + 'blobs_in_stream': self.blobs_in_stream, + 'status': self.status, + 'claim_id': self.claim_id, + 'txid': self.txid, + 'nout': self.nout, + 'outpoint': self.outpoint, + 'metadata': self.metadata, + 'channel_claim_id': self.channel_claim_id, + 'channel_name': self.channel_name, + 'claim_name': self.claim_name + } + + @classmethod + async def create(cls, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', + file_path: str) -> 'ManagedStream': + descriptor = await StreamDescriptor.create_stream( + loop, blob_manager.blob_dir, file_path + ) + sd_blob = blob_manager.get_blob(descriptor.sd_hash) + await blob_manager.blob_completed(sd_blob) + await blob_manager.storage.store_stream( + blob_manager.get_blob(descriptor.sd_hash), descriptor + ) + return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path), + status=cls.STATUS_FINISHED) + + async def stop_download(self): + if self.downloader: + await self.downloader.stop() + if not self.finished: + self.update_status(self.STATUS_STOPPED) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py new file mode 100644 index 000000000..313a25e2b --- /dev/null +++ b/lbrynet/stream/stream_manager.py @@ -0,0 +1,255 @@ +import os +import asyncio +import typing +import binascii +import logging +from lbrynet.stream.downloader import StreamDownloader +from lbrynet.stream.managed_stream import ManagedStream +from lbrynet.schema.claim import ClaimDict +from lbrynet.extras.daemon.storage import StoredStreamClaim, lbc_to_dewies +if typing.TYPE_CHECKING: + from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.dht.peer import KademliaPeer + from lbrynet.dht.node import Node + from lbrynet.extras.daemon.storage import SQLiteStorage + from lbrynet.extras.wallet import LbryWalletManager + +log = logging.getLogger(__name__) + + +filter_fields = [ + 'status', + 'file_name', + 'sd_hash', + 'stream_hash', + 'claim_name', + 'claim_height', + 'claim_id', + 'outpoint', + 'txid', + 'nout', + 'channel_claim_id', + 'channel_name', +] + +comparison_operators = { + 'eq': lambda a, b: a == b, + 'ne': lambda a, b: a != b, + 'g': lambda a, b: a > b, + 'l': lambda a, b: a < b, + 'ge': lambda a, b: a >= b, + 'le': lambda a, b: a <= b, +} + + +class StreamManager: + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', wallet: 'LbryWalletManager', + storage: 'SQLiteStorage', node: 'Node', peer_timeout: float, peer_connect_timeout: float, + fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None): + self.loop = loop + self.blob_manager = blob_manager + self.wallet = wallet + self.storage = storage + self.node = node + self.peer_timeout = peer_timeout + self.peer_connect_timeout = peer_connect_timeout + self.streams: typing.Set[ManagedStream] = set() + self.starting_streams: typing.Dict[str, asyncio.Future] = {} + self.resume_downloading_task: asyncio.Task = None + self.update_stream_finished_futs: typing.List[asyncio.Future] = [] + self.fixed_peers = fixed_peers + + async def load_streams_from_database(self): + infos = await self.storage.get_all_lbry_files() + for file_info in infos: + sd_blob = self.blob_manager.get_blob(file_info['sd_hash']) + if sd_blob.get_is_verified(): + descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) + downloader = StreamDownloader( + self.loop, self.blob_manager, descriptor.sd_hash, self.peer_timeout, + self.peer_connect_timeout, binascii.unhexlify(file_info['download_directory']).decode(), + binascii.unhexlify(file_info['file_name']).decode(), self.fixed_peers + ) + stream = ManagedStream( + self.loop, self.blob_manager, descriptor, + binascii.unhexlify(file_info['download_directory']).decode(), + binascii.unhexlify(file_info['file_name']).decode(), + downloader, file_info['status'], file_info['claim'] + ) + self.streams.add(stream) + + async def resume(self): + await self.node.joined.wait() + resumed = 0 + for stream in self.streams: + if stream.status == ManagedStream.STATUS_RUNNING: + resumed += 1 + stream.downloader.download(self.node) + self.wait_for_stream_finished(stream) + if resumed: + log.info("resuming %i downloads", resumed) + + async def start(self): + await self.load_streams_from_database() + self.resume_downloading_task = self.loop.create_task(self.resume()) + + async def stop(self): + if self.resume_downloading_task and not self.resume_downloading_task.done(): + self.resume_downloading_task.cancel() + while self.streams: + stream = self.streams.pop() + await stream.stop_download() + while self.update_stream_finished_futs: + self.update_stream_finished_futs.pop().cancel() + + async def create_stream(self, file_path: str) -> ManagedStream: + stream = await ManagedStream.create(self.loop, self.blob_manager, file_path) + self.streams.add(stream) + return stream + + async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): + await stream.stop_download() + self.streams.remove(stream) + await self.storage.delete_stream(stream.descriptor) + + blob_hashes = [stream.sd_hash] + for blob_info in stream.descriptor.blobs[:-1]: + blob_hashes.append(blob_info.blob_hash) + for blob_hash in blob_hashes: + blob = self.blob_manager.get_blob(blob_hash) + if blob.get_is_verified(): + await blob.delete() + + if delete_file: + path = os.path.join(stream.download_directory, stream.file_name) + if os.path.isfile(path): + os.remove(path) + + def wait_for_stream_finished(self, stream: ManagedStream): + async def _wait_for_stream_finished(): + if stream.downloader and stream.running: + try: + await stream.downloader.stream_finished_event.wait() + stream.update_status(ManagedStream.STATUS_FINISHED) + except asyncio.CancelledError: + pass + task = self.loop.create_task(_wait_for_stream_finished()) + self.update_stream_finished_futs.append(task) + task.add_done_callback( + lambda _: None if task not in self.update_stream_finished_futs else + self.update_stream_finished_futs.remove(task) + ) + + async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict, + file_name: typing.Optional[str] = None, data_rate: typing.Optional[int] = 0, + sd_blob_timeout: typing.Optional[float] = 60 + ) -> typing.Optional[ManagedStream]: + + claim = ClaimDict.load_dict(claim_info['value']) + downloader = StreamDownloader(self.loop, self.blob_manager, claim.source_hash.decode(), self.peer_timeout, + self.peer_connect_timeout, download_directory, file_name, self.fixed_peers) + try: + downloader.download(node) + await asyncio.wait_for(downloader.got_descriptor.wait(), sd_blob_timeout) + log.info("got descriptor %s for %s", claim.source_hash.decode(), claim_info['name']) + except (asyncio.TimeoutError, asyncio.CancelledError): + log.info("stream timeout") + await downloader.stop() + log.info("stopped stream") + return + if not await self.blob_manager.storage.stream_exists(downloader.sd_hash): + await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor) + if not await self.blob_manager.storage.file_exists(downloader.sd_hash): + await self.blob_manager.storage.save_downloaded_file( + downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory, + data_rate + ) + await self.blob_manager.storage.save_content_claim( + downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" + ) + + stored_claim = StoredStreamClaim( + downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], + claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['hex'], + claim.certificate_id, claim_info['address'], claim_info['claim_sequence'], + claim_info.get('channel_name') + ) + stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory, + os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING, + stored_claim) + self.streams.add(stream) + try: + await stream.downloader.wrote_bytes_event.wait() + self.wait_for_stream_finished(stream) + return stream + except asyncio.CancelledError: + await downloader.stop() + + async def download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict, + file_name: typing.Optional[str] = None, + sd_blob_timeout: typing.Optional[float] = 60, + fee_amount: typing.Optional[float] = 0.0, + fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]: + log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) + claim = ClaimDict.load_dict(claim_info['value']) + if fee_address and fee_amount: + if fee_amount > await self.wallet.default_account.get_balance(): + raise Exception("not enough funds") + sd_hash = claim.source_hash.decode() + if sd_hash in self.starting_streams: + return await self.starting_streams[sd_hash] + already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams)) + if already_started: + return already_started[0] + + self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop) + stream_task = self.loop.create_task( + self._download_stream_from_claim(node, download_directory, claim_info, file_name, 0, sd_blob_timeout) + ) + try: + await asyncio.wait_for(stream_task, sd_blob_timeout) + stream = await stream_task + self.starting_streams[sd_hash].set_result(stream) + if fee_address and fee_amount: + await self.wallet.send_amount_to_address(lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')) + return stream + except (asyncio.TimeoutError, asyncio.CancelledError): + return + finally: + if sd_hash in self.starting_streams: + del self.starting_streams[sd_hash] + log.info("returned from get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) + + def get_filtered_streams(self, sort_by: typing.Optional[str] = None, reverse: typing.Optional[bool] = False, + comparison: typing.Optional[str] = None, + **search_by) -> typing.List[ManagedStream]: + """ + Get a list of filtered and sorted ManagedStream objects + + :param sort_by: field to sort by + :param reverse: reverse sorting + :param comparison: comparison operator used for filtering + :param search_by: fields and values to filter by + """ + if sort_by and sort_by not in filter_fields: + raise ValueError(f"'{sort_by}' is not a valid field to sort by") + if comparison and comparison not in comparison_operators: + raise ValueError(f"'{comparison}' is not a valid comparison") + for search in search_by.keys(): + if search not in filter_fields: + raise ValueError(f"'{search}' is not a valid search operation") + if search_by: + comparison = comparison or 'eq' + streams = [] + for stream in self.streams: + for search, val in search_by.items(): + if comparison_operators[comparison](getattr(stream, search), val): + streams.append(stream) + break + else: + streams = list(self.streams) + if sort_by: + streams.sort(key=lambda s: getattr(s, sort_by)) + if reverse: + streams.reverse() + return streams diff --git a/tests/unit/stream/__init__.py b/tests/unit/stream/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/stream/test_assembler.py b/tests/unit/stream/test_assembler.py new file mode 100644 index 000000000..c0c7c68f4 --- /dev/null +++ b/tests/unit/stream/test_assembler.py @@ -0,0 +1,76 @@ +import os +import asyncio +import tempfile +import shutil +from torba.testcase import AsyncioTestCase +from lbrynet.blob.blob_manager import BlobFileManager +from lbrynet.blob.blob_file import MAX_BLOB_SIZE +from lbrynet.extras.daemon.storage import SQLiteStorage +from lbrynet.stream.descriptor import StreamDescriptor +from lbrynet.stream.assembler import StreamAssembler + + +class TestStreamAssembler(AsyncioTestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + self.key = b'deadbeef' * 4 + self.cleartext = b'test' + + async def test_create_and_decrypt_one_blob_stream(self): + tmp_dir = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(tmp_dir)) + self.storage = SQLiteStorage(os.path.join(tmp_dir, "lbrynet.sqlite")) + await self.storage.open() + self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage) + + download_dir = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(download_dir)) + + # create the stream + file_path = os.path.join(tmp_dir, "test_file") + with open(file_path, 'wb') as f: + f.write(self.cleartext) + + sd = await StreamDescriptor.create_stream(self.loop, tmp_dir, file_path, key=self.key) + + # copy blob files + sd_hash = sd.calculate_sd_hash() + shutil.copy(os.path.join(tmp_dir, sd_hash), os.path.join(download_dir, sd_hash)) + for blob_info in sd.blobs: + if blob_info.blob_hash: + shutil.copy(os.path.join(tmp_dir, blob_info.blob_hash), os.path.join(download_dir, blob_info.blob_hash)) + downloader_storage = SQLiteStorage(os.path.join(download_dir, "lbrynet.sqlite")) + await downloader_storage.open() + + # add the blobs to the blob table (this would happen upon a blob download finishing) + downloader_blob_manager = BlobFileManager(self.loop, download_dir, downloader_storage) + descriptor = await downloader_blob_manager.get_stream_descriptor(sd_hash) + + # assemble the decrypted file + assembler = StreamAssembler(self.loop, downloader_blob_manager, descriptor.sd_hash) + await assembler.assemble_decrypted_stream(download_dir) + + with open(os.path.join(download_dir, "test_file"), "rb") as f: + decrypted = f.read() + self.assertEqual(decrypted, self.cleartext) + self.assertEqual(True, self.blob_manager.get_blob(sd_hash).get_is_verified()) + + await downloader_storage.close() + await self.storage.close() + + async def test_create_and_decrypt_multi_blob_stream(self): + self.cleartext = b'test\n' * 20000000 + await self.test_create_and_decrypt_one_blob_stream() + + async def test_create_and_decrypt_padding(self): + for i in range(16): + self.cleartext = os.urandom((MAX_BLOB_SIZE*2) + i) + await self.test_create_and_decrypt_one_blob_stream() + + for i in range(16): + self.cleartext = os.urandom((MAX_BLOB_SIZE*2) - i) + await self.test_create_and_decrypt_one_blob_stream() + + async def test_create_and_decrypt_random(self): + self.cleartext = os.urandom(20000000) + await self.test_create_and_decrypt_one_blob_stream() diff --git a/tests/unit/stream/test_downloader.py b/tests/unit/stream/test_downloader.py new file mode 100644 index 000000000..cd791dfd3 --- /dev/null +++ b/tests/unit/stream/test_downloader.py @@ -0,0 +1,81 @@ +import os +import mock +import asyncio +import contextlib +from lbrynet.stream.descriptor import StreamDescriptor +from lbrynet.stream.downloader import StreamDownloader +from lbrynet.dht.node import Node +from lbrynet.dht.peer import KademliaPeer +from lbrynet.blob.blob_file import MAX_BLOB_SIZE +from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase + + +class TestStreamDownloader(BlobExchangeTestBase): + async def setup_stream(self, blob_count: int = 10): + self.stream_bytes = b'' + for _ in range(blob_count): + self.stream_bytes += os.urandom((MAX_BLOB_SIZE - 1)) + # create the stream + file_path = os.path.join(self.server_dir, "test_file") + with open(file_path, 'wb') as f: + f.write(self.stream_bytes) + descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path) + self.sd_hash = descriptor.calculate_sd_hash() + self.downloader = StreamDownloader(self.loop, self.client_blob_manager, self.sd_hash, 3, 3, self.client_dir) + + async def _test_transfer_stream(self, blob_count: int, mock_peer_search=None): + await self.setup_stream(blob_count) + + mock_node = mock.Mock(spec=Node) + + @contextlib.asynccontextmanager + async def _mock_peer_search(*_): + async def _gen(): + yield [self.server_from_client] + return + + yield _gen() + + mock_node.stream_peer_search_junction = mock_peer_search or _mock_peer_search + + self.downloader.download(mock_node) + await self.downloader.stream_finished_event.wait() + await self.downloader.stop() + self.assertTrue(os.path.isfile(self.downloader.output_path)) + with open(self.downloader.output_path, 'rb') as f: + self.assertEqual(f.read(), self.stream_bytes) + + async def test_transfer_stream(self): + await self._test_transfer_stream(10) + + async def test_transfer_hundred_blob_stream(self): + await self._test_transfer_stream(100) + + async def test_transfer_stream_bad_first_peer_good_second(self): + await self.setup_stream(2) + + mock_node = mock.Mock(spec=Node) + + bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334) + + @contextlib.asynccontextmanager + async def mock_peer_search(*_): + async def _gen(): + await asyncio.sleep(0.05, loop=self.loop) + yield [bad_peer] + await asyncio.sleep(0.1, loop=self.loop) + yield [self.server_from_client] + return + + yield _gen() + + mock_node.stream_peer_search_junction = mock_peer_search + + self.downloader.download(mock_node) + await self.downloader.stream_finished_event.wait() + await self.downloader.stop() + self.assertTrue(os.path.isfile(self.downloader.output_path)) + with open(self.downloader.output_path, 'rb') as f: + self.assertEqual(f.read(), self.stream_bytes) + # self.assertIs(self.server_from_client.tcp_last_down, None) + # self.assertIsNot(bad_peer.tcp_last_down, None)