make class to encompass stream metadata

This commit is contained in:
Jimmy Kiselak 2015-09-21 22:06:04 -04:00
parent be1e97c616
commit f94b04bb19
8 changed files with 130 additions and 68 deletions

View file

@ -107,6 +107,19 @@ class BlobStreamDescriptorWriter(StreamDescriptorWriter):
return blob_creator.close()
class StreamMetadata(object):
FROM_BLOB = 1
FROM_PLAIN = 2
def __init__(self, validator, options, factories):
self.validator = validator
self.options = options
self.factories = factories
self.metadata_source = None
self.source_blob_hash = None
self.source_file = None
class StreamDescriptorIdentifier(object):
"""Tries to determine the type of stream described by the stream descriptor using the
'stream_type' field. Keeps a list of StreamDescriptorValidators and StreamDownloaderFactorys
@ -155,17 +168,28 @@ class StreamDescriptorIdentifier(object):
"""
self._stream_downloader_factories[stream_type].append(factory)
def get_info_and_factories_for_sd_file(self, sd_path):
def _return_metadata(self, options_validator_factories, source_type, source):
validator, options, factories = options_validator_factories
m = StreamMetadata(validator, options, factories)
m.metadata_source = source_type
if source_type == StreamMetadata.FROM_BLOB:
m.source_blob_hash = source
if source_type == StreamMetadata.FROM_PLAIN:
m.source_file = source
return m
def get_metadata_for_sd_file(self, sd_path):
sd_reader = PlainStreamDescriptorReader(sd_path)
d = sd_reader.get_info()
d.addCallback(self._return_info_and_factories)
d.addCallback(self._return_options_and_validator_and_factories)
d.addCallback(self._return_metadata, StreamMetadata.FROM_PLAIN, sd_path)
return d
def get_info_and_factories_for_sd_blob(self, sd_blob):
def get_metadata_for_sd_blob(self, sd_blob):
sd_reader = BlobStreamDescriptorReader(sd_blob)
d = sd_reader.get_info()
d.addCallback(self._return_info_and_factories)
d.addCallback(self._return_options_and_validator_and_factories)
d.addCallback(self._return_metadata, StreamMetadata.FROM_BLOB, sd_blob.blob_hash)
return d
def _get_factories(self, stream_type):
@ -183,7 +207,7 @@ class StreamDescriptorIdentifier(object):
raise UnknownStreamTypeError(stream_type)
return self._stream_options[stream_type]
def _return_info_and_factories(self, sd_info):
def _return_options_and_validator_and_factories(self, sd_info):
if not 'stream_type' in sd_info:
raise InvalidStreamDescriptorError('No stream_type parameter in stream descriptor.')
stream_type = sd_info['stream_type']

View file

@ -124,6 +124,7 @@ class LBRYFileStreamDescriptorValidator(object):
h.update(blobs_hashsum.digest())
if h.hexdigest() != stream_hash:
raise InvalidStreamDescriptorError("Stream hash does not match stream metadata")
log.debug("It is validated")
return defer.succeed(True)
def info_to_show(self):

View file

@ -6,6 +6,7 @@ from zope.interface import implements
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbryfile.client.LBRYFileMetadataHandler import LBRYFileMetadataHandler
import os
@ -97,19 +98,27 @@ class LBRYFileDownloaderFactory(object):
def can_download(self, sd_validator):
return True
def make_downloader(self, sd_validator, options, payment_rate_manager, **kwargs):
def make_downloader(self, metadata, options, payment_rate_manager, **kwargs):
payment_rate_manager.min_blob_data_payment_rate = options[0]
upload_allowed = options[1]
def save_source_if_blob(stream_hash):
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
d = self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash)
else:
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
return d
def create_downloader(stream_hash):
downloader = self._make_downloader(stream_hash, payment_rate_manager, sd_validator.raw_info,
upload_allowed)
downloader = self._make_downloader(stream_hash, payment_rate_manager,
metadata.validator.raw_info, upload_allowed)
d = downloader.set_stream_info()
d.addCallback(lambda _: downloader)
return d
d = save_sd_info(self.stream_info_manager, sd_validator.raw_info)
d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info)
d.addCallback(save_source_if_blob)
d.addCallback(create_downloader)
return d
@ -197,7 +206,8 @@ class LBRYFileSaverFactory(LBRYFileDownloaderFactory):
self.stream_info_manager, payment_rate_manager, self.wallet,
self.download_directory, upload_allowed)
def get_description(self):
@staticmethod
def get_description():
return "Save"
@ -276,5 +286,6 @@ class LBRYFileOpenerFactory(LBRYFileDownloaderFactory):
return LBRYFileOpener(stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager,
self.stream_info_manager, payment_rate_manager, self.wallet, upload_allowed)
def get_description(self):
@staticmethod
def get_description():
return "Stream"

View file

@ -4,6 +4,7 @@ Download LBRY Files from LBRYnet and save them to disk.
from zope.interface import implements
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaver, LBRYFileDownloader
from lbrynet.lbryfilemanager.LBRYFileStatusReport import LBRYFileStatusReport
from lbrynet.interfaces import IStreamDownloaderFactory
@ -117,16 +118,27 @@ class ManagedLBRYFileDownloaderFactory(object):
def can_download(self, sd_validator):
return True
def make_downloader(self, sd_validator, options, payment_rate_manager):
def make_downloader(self, metadata, options, payment_rate_manager):
data_rate = options[0]
upload_allowed = options[1]
d = save_sd_info(self.lbry_file_manager.stream_info_manager, sd_validator.raw_info)
def save_source_if_blob(stream_hash):
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash,
metadata.source_blob_hash)
else:
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
return d
d = save_sd_info(self.lbry_file_manager.stream_info_manager, metadata.validator.raw_info)
d.addCallback(save_source_if_blob)
d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file(stream_hash,
payment_rate_manager,
data_rate,
upload_allowed))
return d
def get_description(self):
@staticmethod
def get_description():
return "Save the file to disk"

View file

@ -1,4 +1,5 @@
import binascii
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader
from zope.interface import implements
from lbrynet.lbrylive.client.LiveStreamMetadataHandler import LiveStreamMetadataHandler
@ -85,9 +86,9 @@ class FullLiveStreamDownloader(LiveStreamDownloader):
d.addCallback(lambda _: set_file_name_if_unset())
return d
def stop(self):
def stop(self, err=None):
d = self._close_file()
d.addBoth(lambda _: LiveStreamDownloader.stop(self))
d.addBoth(lambda _: LiveStreamDownloader.stop(self, err))
return d
def _start(self):
@ -141,11 +142,21 @@ class FullLiveStreamDownloaderFactory(object):
def can_download(self, sd_validator):
return True
def make_downloader(self, sd_validator, options, payment_rate_manager):
def make_downloader(self, metadata, options, payment_rate_manager):
# TODO: check options for payment rate manager parameters
payment_rate_manager = LiveStreamPaymentRateManager(self.default_payment_rate_manager,
payment_rate_manager)
d = save_sd_info(self.stream_info_manager, sd_validator.raw_info)
def save_source_if_blob(stream_hash):
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
d = self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash)
else:
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
return d
d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info)
d.addCallback(save_source_if_blob)
def create_downloader(stream_hash):
stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter,

View file

@ -259,11 +259,9 @@ class AddStream(ControlHandler):
def __init__(self, sd_identifier, base_payment_rate_manager):
self.sd_identifier = sd_identifier
self.loading_info_and_factories_deferred = None
self.factories = None
self.loading_metadata_deferred = None
self.metadata = None
self.factory = None
self.info_validator = None
self.options = None
self.options_left = []
self.options_chosen = []
self.current_option = None
@ -278,27 +276,27 @@ class AddStream(ControlHandler):
return False, defer.succeed(self.line_prompt)
if self.loading_failed is True:
return True, None
if self.loading_info_and_factories_deferred is not None:
if self.loading_metadata_deferred is not None:
if line.lower() == "cancel":
self.loading_info_and_factories_deferred.cancel()
self.loading_info_and_factories_deferred = None
self.loading_metadata_deferred.cancel()
self.loading_metadata_deferred = None
return True, None
else:
return False, defer.succeed(self.cancel_prompt)
if self.factories is None:
self.loading_info_and_factories_deferred = self._load_info_and_factories(line)
if self.metadata is None:
self.loading_metadata_deferred = self._load_metadata(line)
cancel_prompt_d = defer.succeed(self.cancel_prompt)
self.loading_info_and_factories_deferred.addCallback(self._choose_factory)
self.loading_info_and_factories_deferred.addErrback(self._handle_load_canceled)
self.loading_info_and_factories_deferred.addErrback(self._handle_load_failed)
return False, cancel_prompt_d, self.loading_info_and_factories_deferred
self.loading_metadata_deferred.addCallback(self._choose_factory)
self.loading_metadata_deferred.addErrback(self._handle_load_canceled)
self.loading_metadata_deferred.addErrback(self._handle_load_failed)
return False, cancel_prompt_d, self.loading_metadata_deferred
if self.factory is None:
try:
choice = int(line)
except ValueError:
return False, defer.succeed(self._show_factory_choices())
if choice in xrange(len(self.factories)):
self.factory = self.factories[choice]
if choice in xrange(len(self.metadata.factories)):
self.factory = self.metadata.factories[choice]
return False, defer.succeed(self._show_info_and_options())
else:
return False, defer.succeed(self._show_factory_choices())
@ -350,7 +348,7 @@ class AddStream(ControlHandler):
return choice_num
raise InvalidChoiceError()
def _load_info_and_factories(self, sd_file):
def _load_metadata(self, sd_file):
return defer.fail(NotImplementedError())
def _handle_load_canceled(self, err):
@ -364,25 +362,25 @@ class AddStream(ControlHandler):
"See console.log for further details.\n\n"
"Press enter to continue")
def _choose_factory(self, info_and_factories):
self.loading_info_and_factories_deferred = None
self.info_validator, self.options, self.factories = info_and_factories
if len(self.factories) == 1:
self.factory = self.factories[0]
def _choose_factory(self, metadata):
self.loading_metadata_deferred = None
self.metadata = metadata
if len(self.metadata.factories) == 1:
self.factory = self.metadata.factories[0]
return self._show_info_and_options()
return self._show_factory_choices()
def _show_factory_choices(self):
prompt = "Choose what to do with the file:\n"
for i, factory in enumerate(self.factories):
for i, factory in enumerate(self.metadata.factories):
prompt += "[" + str(i) + "] " + factory.get_description() + '\n'
return str(prompt)
def _show_info_and_options(self):
self.options_left = self.options.get_downloader_options(self.info_validator,
self.payment_rate_manager)
self.options_left = self.metadata.options.get_downloader_options(self.metadata.validator,
self.payment_rate_manager)
prompt = "Stream info:\n"
for info_line in self.info_validator.info_to_show():
for info_line in self.metadata.validator.info_to_show():
prompt += info_line[0] + ": " + info_line[1] + "\n"
prompt += "\nOptions:\n"
for option in self.options_left:
@ -460,7 +458,7 @@ class AddStream(ControlHandler):
return "An unexpected error has caused the download to stop. See console.log for details."
def _make_downloader(self):
return self.factory.make_downloader(self.info_validator, self.options_chosen,
return self.factory.make_downloader(self.metadata, self.options_chosen,
self.payment_rate_manager)
@ -468,8 +466,8 @@ class AddStreamFromSD(AddStream):
prompt_description = "Add a stream from a stream descriptor file"
line_prompt = "Stream descriptor file name:"
def _load_info_and_factories(self, sd_file):
return self.sd_identifier.get_info_and_factories_for_sd_file(sd_file)
def _load_metadata(self, sd_file):
return self.sd_identifier.get_metadata_for_sd_file(sd_file)
class AddStreamFromSDFactory(ControlHandlerFactory):
@ -484,9 +482,9 @@ class AddStreamFromHash(AddStream):
AddStream.__init__(self, sd_identifier, session.base_payment_rate_manager)
self.session = session
def _load_info_and_factories(self, sd_hash):
def _load_metadata(self, sd_hash):
d = download_sd_blob(self.session, sd_hash, self.payment_rate_manager)
d.addCallback(self.sd_identifier.get_info_and_factories_for_sd_blob)
d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
return d
def _handle_load_failed(self, err):
@ -511,9 +509,9 @@ class AddStreamFromLBRYcrdName(AddStreamFromHash):
AddStreamFromHash.__init__(self, sd_identifier, session)
self.name_resolver = name_resolver
def _load_info_and_factories(self, name):
def _load_metadata(self, name):
d = self._resolve_name(name)
d.addCallback(lambda stream_hash: AddStreamFromHash._load_info_and_factories(self, stream_hash))
d.addCallback(lambda stream_hash: AddStreamFromHash._load_metadata(self, stream_hash))
return d
def _resolve_name(self, name):
@ -991,7 +989,7 @@ class ClaimName(ControlHandler):
choice = -1
if choice < 0 or choice >= len(self.file_type_options):
return False, defer.succeed("You must enter a valid number.\n\n%s" % self._get_file_type_options())
if self.file_type_options[choice] is None:
if self.file_type_options[choice][0] is None:
return True, defer.succeed("Publishing canceled.")
self.file_type_chosen = self.file_type_options[choice][0]
if self.file_type_chosen == "hash":
@ -1079,7 +1077,7 @@ class ClaimName(ControlHandler):
def get_validator_for_blob(blob):
if not blob.verified:
return None
d = self.sd_identifier.get_info_and_factories_for_sd_blob(blob)
d = self.sd_identifier.get_metadata_for_sd_blob(blob)
d.addCallback(lambda v_o_f: v_o_f[0])
return d

View file

@ -365,7 +365,7 @@ class LBRYDownloader(object):
stream_frame.show_metadata_status("name resolved, fetching metadata...")
get_sd_d = StreamDescriptor.download_sd_blob(self.session, sd_hash,
payment_rate_manager)
get_sd_d.addCallback(self.sd_identifier.get_info_and_factories_for_sd_blob)
get_sd_d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
get_sd_d.addCallbacks(choose_download_factory, bad_sd_blob)
return get_sd_d
@ -385,9 +385,9 @@ class LBRYDownloader(object):
stream_name = "unknown"
return stream_name, stream_size
def choose_download_factory(info_and_factories):
info_validator, options, factories = info_and_factories
stream_name, stream_size = get_info_from_validator(info_validator)
def choose_download_factory(metadata):
#info_validator, options, factories = info_and_factories
stream_name, stream_size = get_info_from_validator(metadata.validator)
if isinstance(stream_size, (int, long)):
price = payment_rate_manager.get_effective_min_blob_data_payment_rate()
estimated_cost = stream_size * 1.0 / 2**20 * price
@ -396,7 +396,8 @@ class LBRYDownloader(object):
stream_frame.show_stream_metadata(stream_name, stream_size)
available_options = options.get_downloader_options(info_validator, payment_rate_manager)
available_options = metadata.options.get_downloader_options(metadata.validator,
payment_rate_manager)
stream_frame.show_download_options(available_options)
@ -409,11 +410,11 @@ class LBRYDownloader(object):
get_downloader_d.callback(downloader)
stream_frame.disable_download_buttons()
d = f.make_downloader(info_validator, chosen_options,
d = f.make_downloader(metadata, chosen_options,
payment_rate_manager)
d.addCallback(fire_get_downloader_d)
for factory in factories:
for factory in metadata.factories:
def choose_factory(f=factory):
chosen_options = stream_frame.get_chosen_options()

View file

@ -677,15 +677,17 @@ class TestTransfer(TestCase):
self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier)
def make_downloader(info_and_factories, prm):
info_validator, options, factories = info_and_factories
def make_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(info_validator, chosen_options, prm)
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
d = download_sd_blob(self.session, sd_hash, prm)
d.addCallback(sd_identifier.get_info_and_factories_for_sd_blob)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
d.addCallback(lambda downloader: downloader.start())
return d
@ -758,10 +760,12 @@ class TestTransfer(TestCase):
d = self.wait_for_hash_from_queue(sd_hash_queue)
def create_downloader(info_and_factories, prm):
info_validator, options, factories = info_and_factories
def create_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(info_validator, chosen_options, prm)
return factories[0].make_downloader(metadata, chosen_options, prm)
def start_lbry_file(lbry_file):
lbry_file = lbry_file
@ -772,7 +776,7 @@ class TestTransfer(TestCase):
logging.debug("Downloaded the sd blob. Reading it now")
prm = PaymentRateManager(self.session.base_payment_rate_manager)
d = download_sd_blob(self.session, sd_blob_hash, prm)
d.addCallback(sd_identifier.get_info_and_factories_for_sd_blob)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(create_downloader, prm)
d.addCallback(start_lbry_file)
return d