From f94b04bb19385216abacbbea428a8f904314d4c1 Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Mon, 21 Sep 2015 22:06:04 -0400 Subject: [PATCH] make class to encompass stream metadata --- lbrynet/core/StreamDescriptor.py | 34 ++++++++-- lbrynet/lbryfile/StreamDescriptor.py | 1 + lbrynet/lbryfile/client/LBRYFileDownloader.py | 25 +++++-- lbrynet/lbryfilemanager/LBRYFileDownloader.py | 18 ++++- .../lbrylive/client/LiveStreamDownloader.py | 19 ++++-- lbrynet/lbrynet_console/ControlHandlers.py | 66 +++++++++---------- lbrynet/lbrynet_gui/LBRYGui.py | 15 +++-- tests/functional_tests.py | 20 +++--- 8 files changed, 130 insertions(+), 68 deletions(-) diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index f0b7d380a..a965969e8 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -107,6 +107,19 @@ class BlobStreamDescriptorWriter(StreamDescriptorWriter): return blob_creator.close() +class StreamMetadata(object): + FROM_BLOB = 1 + FROM_PLAIN = 2 + + def __init__(self, validator, options, factories): + self.validator = validator + self.options = options + self.factories = factories + self.metadata_source = None + self.source_blob_hash = None + self.source_file = None + + class StreamDescriptorIdentifier(object): """Tries to determine the type of stream described by the stream descriptor using the 'stream_type' field. Keeps a list of StreamDescriptorValidators and StreamDownloaderFactorys @@ -155,17 +168,28 @@ class StreamDescriptorIdentifier(object): """ self._stream_downloader_factories[stream_type].append(factory) - def get_info_and_factories_for_sd_file(self, sd_path): + def _return_metadata(self, options_validator_factories, source_type, source): + validator, options, factories = options_validator_factories + m = StreamMetadata(validator, options, factories) + m.metadata_source = source_type + if source_type == StreamMetadata.FROM_BLOB: + m.source_blob_hash = source + if source_type == StreamMetadata.FROM_PLAIN: + m.source_file = source + return m + def get_metadata_for_sd_file(self, sd_path): sd_reader = PlainStreamDescriptorReader(sd_path) d = sd_reader.get_info() - d.addCallback(self._return_info_and_factories) + d.addCallback(self._return_options_and_validator_and_factories) + d.addCallback(self._return_metadata, StreamMetadata.FROM_PLAIN, sd_path) return d - def get_info_and_factories_for_sd_blob(self, sd_blob): + def get_metadata_for_sd_blob(self, sd_blob): sd_reader = BlobStreamDescriptorReader(sd_blob) d = sd_reader.get_info() - d.addCallback(self._return_info_and_factories) + d.addCallback(self._return_options_and_validator_and_factories) + d.addCallback(self._return_metadata, StreamMetadata.FROM_BLOB, sd_blob.blob_hash) return d def _get_factories(self, stream_type): @@ -183,7 +207,7 @@ class StreamDescriptorIdentifier(object): raise UnknownStreamTypeError(stream_type) return self._stream_options[stream_type] - def _return_info_and_factories(self, sd_info): + def _return_options_and_validator_and_factories(self, sd_info): if not 'stream_type' in sd_info: raise InvalidStreamDescriptorError('No stream_type parameter in stream descriptor.') stream_type = sd_info['stream_type'] diff --git a/lbrynet/lbryfile/StreamDescriptor.py b/lbrynet/lbryfile/StreamDescriptor.py index be802fec2..2f194fe21 100644 --- a/lbrynet/lbryfile/StreamDescriptor.py +++ b/lbrynet/lbryfile/StreamDescriptor.py @@ -124,6 +124,7 @@ class LBRYFileStreamDescriptorValidator(object): h.update(blobs_hashsum.digest()) if h.hexdigest() != stream_hash: raise InvalidStreamDescriptorError("Stream hash does not match stream metadata") + log.debug("It is validated") return defer.succeed(True) def info_to_show(self): diff --git a/lbrynet/lbryfile/client/LBRYFileDownloader.py b/lbrynet/lbryfile/client/LBRYFileDownloader.py index 1733ee10b..097fb170c 100644 --- a/lbrynet/lbryfile/client/LBRYFileDownloader.py +++ b/lbrynet/lbryfile/client/LBRYFileDownloader.py @@ -6,6 +6,7 @@ from zope.interface import implements from lbrynet.lbryfile.StreamDescriptor import save_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager +from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbryfile.client.LBRYFileMetadataHandler import LBRYFileMetadataHandler import os @@ -97,19 +98,27 @@ class LBRYFileDownloaderFactory(object): def can_download(self, sd_validator): return True - def make_downloader(self, sd_validator, options, payment_rate_manager, **kwargs): + def make_downloader(self, metadata, options, payment_rate_manager, **kwargs): payment_rate_manager.min_blob_data_payment_rate = options[0] upload_allowed = options[1] + def save_source_if_blob(stream_hash): + if metadata.metadata_source == StreamMetadata.FROM_BLOB: + d = self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash) + else: + d = defer.succeed(True) + d.addCallback(lambda _: stream_hash) + return d + def create_downloader(stream_hash): - downloader = self._make_downloader(stream_hash, payment_rate_manager, sd_validator.raw_info, - upload_allowed) + downloader = self._make_downloader(stream_hash, payment_rate_manager, + metadata.validator.raw_info, upload_allowed) d = downloader.set_stream_info() d.addCallback(lambda _: downloader) return d - d = save_sd_info(self.stream_info_manager, sd_validator.raw_info) - + d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info) + d.addCallback(save_source_if_blob) d.addCallback(create_downloader) return d @@ -197,7 +206,8 @@ class LBRYFileSaverFactory(LBRYFileDownloaderFactory): self.stream_info_manager, payment_rate_manager, self.wallet, self.download_directory, upload_allowed) - def get_description(self): + @staticmethod + def get_description(): return "Save" @@ -276,5 +286,6 @@ class LBRYFileOpenerFactory(LBRYFileDownloaderFactory): return LBRYFileOpener(stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.stream_info_manager, payment_rate_manager, self.wallet, upload_allowed) - def get_description(self): + @staticmethod + def get_description(): return "Stream" \ No newline at end of file diff --git a/lbrynet/lbryfilemanager/LBRYFileDownloader.py b/lbrynet/lbryfilemanager/LBRYFileDownloader.py index bc00993b5..1fdae8659 100644 --- a/lbrynet/lbryfilemanager/LBRYFileDownloader.py +++ b/lbrynet/lbryfilemanager/LBRYFileDownloader.py @@ -4,6 +4,7 @@ Download LBRY Files from LBRYnet and save them to disk. from zope.interface import implements from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager +from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaver, LBRYFileDownloader from lbrynet.lbryfilemanager.LBRYFileStatusReport import LBRYFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory @@ -117,16 +118,27 @@ class ManagedLBRYFileDownloaderFactory(object): def can_download(self, sd_validator): return True - def make_downloader(self, sd_validator, options, payment_rate_manager): + def make_downloader(self, metadata, options, payment_rate_manager): data_rate = options[0] upload_allowed = options[1] - d = save_sd_info(self.lbry_file_manager.stream_info_manager, sd_validator.raw_info) + def save_source_if_blob(stream_hash): + if metadata.metadata_source == StreamMetadata.FROM_BLOB: + d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, + metadata.source_blob_hash) + else: + d = defer.succeed(True) + d.addCallback(lambda _: stream_hash) + return d + + d = save_sd_info(self.lbry_file_manager.stream_info_manager, metadata.validator.raw_info) + d.addCallback(save_source_if_blob) d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager, data_rate, upload_allowed)) return d - def get_description(self): + @staticmethod + def get_description(): return "Save the file to disk" \ No newline at end of file diff --git a/lbrynet/lbrylive/client/LiveStreamDownloader.py b/lbrynet/lbrylive/client/LiveStreamDownloader.py index 2c73259f4..10fe4e65f 100644 --- a/lbrynet/lbrylive/client/LiveStreamDownloader.py +++ b/lbrynet/lbrylive/client/LiveStreamDownloader.py @@ -1,4 +1,5 @@ import binascii +from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader from zope.interface import implements from lbrynet.lbrylive.client.LiveStreamMetadataHandler import LiveStreamMetadataHandler @@ -85,9 +86,9 @@ class FullLiveStreamDownloader(LiveStreamDownloader): d.addCallback(lambda _: set_file_name_if_unset()) return d - def stop(self): + def stop(self, err=None): d = self._close_file() - d.addBoth(lambda _: LiveStreamDownloader.stop(self)) + d.addBoth(lambda _: LiveStreamDownloader.stop(self, err)) return d def _start(self): @@ -141,11 +142,21 @@ class FullLiveStreamDownloaderFactory(object): def can_download(self, sd_validator): return True - def make_downloader(self, sd_validator, options, payment_rate_manager): + def make_downloader(self, metadata, options, payment_rate_manager): # TODO: check options for payment rate manager parameters payment_rate_manager = LiveStreamPaymentRateManager(self.default_payment_rate_manager, payment_rate_manager) - d = save_sd_info(self.stream_info_manager, sd_validator.raw_info) + + def save_source_if_blob(stream_hash): + if metadata.metadata_source == StreamMetadata.FROM_BLOB: + d = self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash) + else: + d = defer.succeed(True) + d.addCallback(lambda _: stream_hash) + return d + + d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info) + d.addCallback(save_source_if_blob) def create_downloader(stream_hash): stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter, diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index c6713b4bb..32a2dd249 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -259,11 +259,9 @@ class AddStream(ControlHandler): def __init__(self, sd_identifier, base_payment_rate_manager): self.sd_identifier = sd_identifier - self.loading_info_and_factories_deferred = None - self.factories = None + self.loading_metadata_deferred = None + self.metadata = None self.factory = None - self.info_validator = None - self.options = None self.options_left = [] self.options_chosen = [] self.current_option = None @@ -278,27 +276,27 @@ class AddStream(ControlHandler): return False, defer.succeed(self.line_prompt) if self.loading_failed is True: return True, None - if self.loading_info_and_factories_deferred is not None: + if self.loading_metadata_deferred is not None: if line.lower() == "cancel": - self.loading_info_and_factories_deferred.cancel() - self.loading_info_and_factories_deferred = None + self.loading_metadata_deferred.cancel() + self.loading_metadata_deferred = None return True, None else: return False, defer.succeed(self.cancel_prompt) - if self.factories is None: - self.loading_info_and_factories_deferred = self._load_info_and_factories(line) + if self.metadata is None: + self.loading_metadata_deferred = self._load_metadata(line) cancel_prompt_d = defer.succeed(self.cancel_prompt) - self.loading_info_and_factories_deferred.addCallback(self._choose_factory) - self.loading_info_and_factories_deferred.addErrback(self._handle_load_canceled) - self.loading_info_and_factories_deferred.addErrback(self._handle_load_failed) - return False, cancel_prompt_d, self.loading_info_and_factories_deferred + self.loading_metadata_deferred.addCallback(self._choose_factory) + self.loading_metadata_deferred.addErrback(self._handle_load_canceled) + self.loading_metadata_deferred.addErrback(self._handle_load_failed) + return False, cancel_prompt_d, self.loading_metadata_deferred if self.factory is None: try: choice = int(line) except ValueError: return False, defer.succeed(self._show_factory_choices()) - if choice in xrange(len(self.factories)): - self.factory = self.factories[choice] + if choice in xrange(len(self.metadata.factories)): + self.factory = self.metadata.factories[choice] return False, defer.succeed(self._show_info_and_options()) else: return False, defer.succeed(self._show_factory_choices()) @@ -350,7 +348,7 @@ class AddStream(ControlHandler): return choice_num raise InvalidChoiceError() - def _load_info_and_factories(self, sd_file): + def _load_metadata(self, sd_file): return defer.fail(NotImplementedError()) def _handle_load_canceled(self, err): @@ -364,25 +362,25 @@ class AddStream(ControlHandler): "See console.log for further details.\n\n" "Press enter to continue") - def _choose_factory(self, info_and_factories): - self.loading_info_and_factories_deferred = None - self.info_validator, self.options, self.factories = info_and_factories - if len(self.factories) == 1: - self.factory = self.factories[0] + def _choose_factory(self, metadata): + self.loading_metadata_deferred = None + self.metadata = metadata + if len(self.metadata.factories) == 1: + self.factory = self.metadata.factories[0] return self._show_info_and_options() return self._show_factory_choices() def _show_factory_choices(self): prompt = "Choose what to do with the file:\n" - for i, factory in enumerate(self.factories): + for i, factory in enumerate(self.metadata.factories): prompt += "[" + str(i) + "] " + factory.get_description() + '\n' return str(prompt) def _show_info_and_options(self): - self.options_left = self.options.get_downloader_options(self.info_validator, - self.payment_rate_manager) + self.options_left = self.metadata.options.get_downloader_options(self.metadata.validator, + self.payment_rate_manager) prompt = "Stream info:\n" - for info_line in self.info_validator.info_to_show(): + for info_line in self.metadata.validator.info_to_show(): prompt += info_line[0] + ": " + info_line[1] + "\n" prompt += "\nOptions:\n" for option in self.options_left: @@ -460,7 +458,7 @@ class AddStream(ControlHandler): return "An unexpected error has caused the download to stop. See console.log for details." def _make_downloader(self): - return self.factory.make_downloader(self.info_validator, self.options_chosen, + return self.factory.make_downloader(self.metadata, self.options_chosen, self.payment_rate_manager) @@ -468,8 +466,8 @@ class AddStreamFromSD(AddStream): prompt_description = "Add a stream from a stream descriptor file" line_prompt = "Stream descriptor file name:" - def _load_info_and_factories(self, sd_file): - return self.sd_identifier.get_info_and_factories_for_sd_file(sd_file) + def _load_metadata(self, sd_file): + return self.sd_identifier.get_metadata_for_sd_file(sd_file) class AddStreamFromSDFactory(ControlHandlerFactory): @@ -484,9 +482,9 @@ class AddStreamFromHash(AddStream): AddStream.__init__(self, sd_identifier, session.base_payment_rate_manager) self.session = session - def _load_info_and_factories(self, sd_hash): + def _load_metadata(self, sd_hash): d = download_sd_blob(self.session, sd_hash, self.payment_rate_manager) - d.addCallback(self.sd_identifier.get_info_and_factories_for_sd_blob) + d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) return d def _handle_load_failed(self, err): @@ -511,9 +509,9 @@ class AddStreamFromLBRYcrdName(AddStreamFromHash): AddStreamFromHash.__init__(self, sd_identifier, session) self.name_resolver = name_resolver - def _load_info_and_factories(self, name): + def _load_metadata(self, name): d = self._resolve_name(name) - d.addCallback(lambda stream_hash: AddStreamFromHash._load_info_and_factories(self, stream_hash)) + d.addCallback(lambda stream_hash: AddStreamFromHash._load_metadata(self, stream_hash)) return d def _resolve_name(self, name): @@ -991,7 +989,7 @@ class ClaimName(ControlHandler): choice = -1 if choice < 0 or choice >= len(self.file_type_options): return False, defer.succeed("You must enter a valid number.\n\n%s" % self._get_file_type_options()) - if self.file_type_options[choice] is None: + if self.file_type_options[choice][0] is None: return True, defer.succeed("Publishing canceled.") self.file_type_chosen = self.file_type_options[choice][0] if self.file_type_chosen == "hash": @@ -1079,7 +1077,7 @@ class ClaimName(ControlHandler): def get_validator_for_blob(blob): if not blob.verified: return None - d = self.sd_identifier.get_info_and_factories_for_sd_blob(blob) + d = self.sd_identifier.get_metadata_for_sd_blob(blob) d.addCallback(lambda v_o_f: v_o_f[0]) return d diff --git a/lbrynet/lbrynet_gui/LBRYGui.py b/lbrynet/lbrynet_gui/LBRYGui.py index 5507c5289..e62ed475f 100644 --- a/lbrynet/lbrynet_gui/LBRYGui.py +++ b/lbrynet/lbrynet_gui/LBRYGui.py @@ -365,7 +365,7 @@ class LBRYDownloader(object): stream_frame.show_metadata_status("name resolved, fetching metadata...") get_sd_d = StreamDescriptor.download_sd_blob(self.session, sd_hash, payment_rate_manager) - get_sd_d.addCallback(self.sd_identifier.get_info_and_factories_for_sd_blob) + get_sd_d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) get_sd_d.addCallbacks(choose_download_factory, bad_sd_blob) return get_sd_d @@ -385,9 +385,9 @@ class LBRYDownloader(object): stream_name = "unknown" return stream_name, stream_size - def choose_download_factory(info_and_factories): - info_validator, options, factories = info_and_factories - stream_name, stream_size = get_info_from_validator(info_validator) + def choose_download_factory(metadata): + #info_validator, options, factories = info_and_factories + stream_name, stream_size = get_info_from_validator(metadata.validator) if isinstance(stream_size, (int, long)): price = payment_rate_manager.get_effective_min_blob_data_payment_rate() estimated_cost = stream_size * 1.0 / 2**20 * price @@ -396,7 +396,8 @@ class LBRYDownloader(object): stream_frame.show_stream_metadata(stream_name, stream_size) - available_options = options.get_downloader_options(info_validator, payment_rate_manager) + available_options = metadata.options.get_downloader_options(metadata.validator, + payment_rate_manager) stream_frame.show_download_options(available_options) @@ -409,11 +410,11 @@ class LBRYDownloader(object): get_downloader_d.callback(downloader) stream_frame.disable_download_buttons() - d = f.make_downloader(info_validator, chosen_options, + d = f.make_downloader(metadata, chosen_options, payment_rate_manager) d.addCallback(fire_get_downloader_d) - for factory in factories: + for factory in metadata.factories: def choose_factory(f=factory): chosen_options = stream_frame.get_chosen_options() diff --git a/tests/functional_tests.py b/tests/functional_tests.py index 43ef7573f..38c3253f8 100644 --- a/tests/functional_tests.py +++ b/tests/functional_tests.py @@ -677,15 +677,17 @@ class TestTransfer(TestCase): self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier) - def make_downloader(info_and_factories, prm): - info_validator, options, factories = info_and_factories + def make_downloader(metadata, prm): + info_validator = metadata.validator + options = metadata.options + factories = metadata.factories chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] - return factories[0].make_downloader(info_validator, chosen_options, prm) + return factories[0].make_downloader(metadata, chosen_options, prm) def download_file(sd_hash): prm = PaymentRateManager(self.session.base_payment_rate_manager) d = download_sd_blob(self.session, sd_hash, prm) - d.addCallback(sd_identifier.get_info_and_factories_for_sd_blob) + d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(make_downloader, prm) d.addCallback(lambda downloader: downloader.start()) return d @@ -758,10 +760,12 @@ class TestTransfer(TestCase): d = self.wait_for_hash_from_queue(sd_hash_queue) - def create_downloader(info_and_factories, prm): - info_validator, options, factories = info_and_factories + def create_downloader(metadata, prm): + info_validator = metadata.validator + options = metadata.options + factories = metadata.factories chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] - return factories[0].make_downloader(info_validator, chosen_options, prm) + return factories[0].make_downloader(metadata, chosen_options, prm) def start_lbry_file(lbry_file): lbry_file = lbry_file @@ -772,7 +776,7 @@ class TestTransfer(TestCase): logging.debug("Downloaded the sd blob. Reading it now") prm = PaymentRateManager(self.session.base_payment_rate_manager) d = download_sd_blob(self.session, sd_blob_hash, prm) - d.addCallback(sd_identifier.get_info_and_factories_for_sd_blob) + d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(create_downloader, prm) d.addCallback(start_lbry_file) return d