diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index c99465673..89560968c 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -19,6 +19,16 @@ class CryptBlobInfo(BlobInfo): BlobInfo.__init__(self, blob_hash, blob_num, length) self.iv = iv + def get_dict(self): + info = { + "blob_num": self.blob_num, + "length": self.length, + "iv": self.iv + } + if self.blob_hash: + info['blob_hash'] = self.blob_hash + return info + class StreamBlobDecryptor(object): def __init__(self, blob, key, iv, length): diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index bf6d3bea7..1598be3ff 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -5,13 +5,13 @@ Utilities for turning plain files into LBRY Files. import binascii import logging import os -from lbrynet.core.StreamDescriptor import PlainStreamDescriptorWriter -from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator -from lbrynet import conf -from lbrynet.lbry_file.StreamDescriptor import get_sd_info -from lbrynet.core.cryptoutils import get_lbry_hash_obj + +from twisted.internet import defer from twisted.protocols.basic import FileSender +from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType +from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash +from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator log = logging.getLogger(__name__) @@ -20,58 +20,32 @@ class EncryptedFileStreamCreator(CryptStreamCreator): """ A CryptStreamCreator which adds itself and its additional metadata to an EncryptedFileManager """ - def __init__(self, blob_manager, lbry_file_manager, name=None, - key=None, iv_generator=None, suggested_file_name=None): - CryptStreamCreator.__init__(self, blob_manager, name, key, iv_generator) + + def __init__(self, blob_manager, lbry_file_manager, stream_name=None, + key=None, iv_generator=None): + CryptStreamCreator.__init__(self, blob_manager, stream_name, key, iv_generator) self.lbry_file_manager = lbry_file_manager - self.suggested_file_name = suggested_file_name or name self.stream_hash = None self.blob_infos = [] + self.sd_info = None def _blob_finished(self, blob_info): log.debug("length: %s", blob_info.length) - self.blob_infos.append(blob_info) + self.blob_infos.append(blob_info.get_dict()) return blob_info - def _save_stream_info(self): - stream_info_manager = self.lbry_file_manager.stream_info_manager - d = stream_info_manager.save_stream(self.stream_hash, hexlify(self.name), - hexlify(self.key), - hexlify(self.suggested_file_name), - self.blob_infos) - return d - - def _get_blobs_hashsum(self): - blobs_hashsum = get_lbry_hash_obj() - for blob_info in sorted(self.blob_infos, key=lambda b_i: b_i.blob_num): - length = blob_info.length - if length != 0: - blob_hash = blob_info.blob_hash - else: - blob_hash = None - blob_num = blob_info.blob_num - iv = blob_info.iv - blob_hashsum = get_lbry_hash_obj() - if length != 0: - blob_hashsum.update(blob_hash) - blob_hashsum.update(str(blob_num)) - blob_hashsum.update(iv) - blob_hashsum.update(str(length)) - blobs_hashsum.update(blob_hashsum.digest()) - return blobs_hashsum.digest() - - def _make_stream_hash(self): - hashsum = get_lbry_hash_obj() - hashsum.update(hexlify(self.name)) - hashsum.update(hexlify(self.key)) - hashsum.update(hexlify(self.suggested_file_name)) - hashsum.update(self._get_blobs_hashsum()) - self.stream_hash = hashsum.hexdigest() - def _finished(self): - self._make_stream_hash() - d = self._save_stream_info() - return d + # calculate the stream hash + self.stream_hash = get_stream_hash( + hexlify(self.name), hexlify(self.key), hexlify(self.name), + self.blob_infos + ) + # generate the sd info + self.sd_info = format_sd_info( + EncryptedFileStreamType, hexlify(self.name), hexlify(self.key), + hexlify(self.name), self.stream_hash, self.blob_infos + ) + return defer.succeed(self.stream_hash) # TODO: this should be run its own thread. Encrypting a large file can @@ -80,8 +54,8 @@ class EncryptedFileStreamCreator(CryptStreamCreator): # great when sending over the network, but this is all local so # we can simply read the file from the disk without needing to # involve reactor. -def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=None, - iv_generator=None, suggested_file_name=None): +@defer.inlineCallbacks +def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=None, iv_generator=None): """Turn a plain file into an LBRY File. An LBRY File is a collection of encrypted blobs of data and the metadata that binds them @@ -104,10 +78,6 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non @param file_handle: The file-like object to read @type file_handle: any file-like object which can be read by twisted.protocols.basic.FileSender - @param secret_pass_phrase: A string that will be used to generate the public key. If None, a - random string will be used. - @type secret_pass_phrase: string - @param key: the raw AES key which will be used to encrypt the blobs. If None, a random key will be generated. @type key: string @@ -116,53 +86,44 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non vectors for the blobs. Will be called once for each blob. @type iv_generator: a generator function which yields strings - @param suggested_file_name: what the file should be called when the LBRY File is saved to disk. - @type suggested_file_name: string - @return: a Deferred which fires with the stream_hash of the LBRY File @rtype: Deferred which fires with hex-encoded string """ - def stop_file(creator): - log.debug("the file sender has triggered its deferred. stopping the stream writer") - return creator.stop() - - def make_stream_desc_file(stream_hash): - log.debug("creating the stream descriptor file") - descriptor_file_path = os.path.join( - session.db_dir, file_name + conf.settings['CRYPTSD_FILE_EXTENSION']) - descriptor_writer = PlainStreamDescriptorWriter(descriptor_file_path) - - d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True) - - d.addCallback(descriptor_writer.create_descriptor) - - return d - base_file_name = os.path.basename(file_name) + file_directory = os.path.dirname(file_handle.name) lbry_file_creator = EncryptedFileStreamCreator( - session.blob_manager, - lbry_file_manager, - base_file_name, key, - iv_generator, - suggested_file_name) + session.blob_manager, lbry_file_manager, base_file_name, key, iv_generator + ) - def start_stream(): - # TODO: Using FileSender isn't necessary, we can just read - # straight from the disk. The stream creation process - # should be in its own thread anyway so we don't need to - # worry about interacting with the twisted reactor - file_sender = FileSender() - d = file_sender.beginFileTransfer(file_handle, lbry_file_creator) - d.addCallback(lambda _: stop_file(lbry_file_creator)) - d.addCallback(lambda _: make_stream_desc_file(lbry_file_creator.stream_hash)) - d.addCallback(lambda _: lbry_file_creator.stream_hash) - return d + yield lbry_file_creator.setup() + # TODO: Using FileSender isn't necessary, we can just read + # straight from the disk. The stream creation process + # should be in its own thread anyway so we don't need to + # worry about interacting with the twisted reactor + file_sender = FileSender() + yield file_sender.beginFileTransfer(file_handle, lbry_file_creator) - d = lbry_file_creator.setup() - d.addCallback(lambda _: start_stream()) - return d + log.debug("the file sender has triggered its deferred. stopping the stream writer") + yield lbry_file_creator.stop() + + log.debug("making the sd blob") + sd_info = lbry_file_creator.sd_info + descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager) + sd_hash = yield descriptor_writer.create_descriptor(sd_info) + + log.debug("saving the stream") + yield session.storage.store_stream( + sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'], + sd_info['suggested_file_name'], sd_info['blobs'] + ) + log.debug("adding to the file manager") + lbry_file = yield lbry_file_manager.add_published_file( + sd_info['stream_hash'], sd_hash, binascii.hexlify(file_directory), session.payment_rate_manager, + session.payment_rate_manager.min_blob_data_payment_rate + ) + defer.returnValue(lbry_file) def hexlify(str_or_unicode): diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 6e7491be6..2e2a054c1 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -2,18 +2,18 @@ Download LBRY Files from LBRYnet and save them to disk. """ import logging +import binascii from zope.interface import implements from twisted.internet import defer from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager from lbrynet.core.utils import short_hash -from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaver from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileDownloader from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory -from lbrynet.lbry_file.StreamDescriptor import save_sd_info +from lbrynet.core.StreamDescriptor import save_sd_info log = logging.getLogger(__name__) @@ -35,19 +35,41 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" - def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, - stream_info_manager, lbry_file_manager, payment_rate_manager, wallet, - download_directory, sd_hash=None, key=None, stream_name=None, - suggested_file_name=None): - EncryptedFileSaver.__init__(self, stream_hash, peer_finder, - rate_limiter, blob_manager, - stream_info_manager, - payment_rate_manager, wallet, - download_directory, key, stream_name, suggested_file_name) + def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, storage, lbry_file_manager, + payment_rate_manager, wallet, download_directory, file_name, stream_name, sd_hash, key, + suggested_file_name): + EncryptedFileSaver.__init__( + self, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, wallet, + download_directory, key, stream_name, file_name + ) self.sd_hash = sd_hash self.rowid = rowid + self.suggested_file_name = binascii.unhexlify(suggested_file_name) self.lbry_file_manager = lbry_file_manager self._saving_status = False + self.claim_id = None + self.outpoint = None + self.claim_name = None + self.txid = None + self.nout = None + self.channel_claim_id = None + self.channel_name = None + self.metadata = None + + @defer.inlineCallbacks + def get_claim_info(self, include_supports=True): + claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports) + if claim_info: + self.claim_id = claim_info['claim_id'] + self.txid = claim_info['txid'] + self.nout = claim_info['nout'] + self.channel_claim_id = claim_info['channel_claim_id'] + self.outpoint = "%s:%i" % (self.txid, self.nout) + self.claim_name = claim_info['name'] + self.channel_name = claim_info['channel_name'] + self.metadata = claim_info['value']['stream']['metadata'] + + defer.returnValue(claim_info) @property def saving_status(self): @@ -77,8 +99,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): @defer.inlineCallbacks def status(self): - blobs = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - blob_hashes = [b[0] for b in blobs if b[0] is not None] + blobs = yield self.storage.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None] completed_blobs = yield self.blob_manager.completed_blobs(blob_hashes) num_blobs_completed = len(completed_blobs) num_blobs_known = len(blob_hashes) @@ -89,8 +111,9 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): status = "stopped" else: status = "running" - defer.returnValue(EncryptedFileStatusReport(self.file_name, num_blobs_completed, - num_blobs_known, status)) + defer.returnValue(EncryptedFileStatusReport( + self.file_name, num_blobs_completed, num_blobs_known, status + )) @defer.inlineCallbacks def _start(self): @@ -137,19 +160,16 @@ class ManagedEncryptedFileDownloaderFactory(object): return True @defer.inlineCallbacks - def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None): - assert len(options) == 1 - data_rate = options[0] - stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager, + def make_downloader(self, metadata, data_rate, payment_rate_manager, download_directory, file_name=None): + stream_hash = yield save_sd_info(self.lbry_file_manager.session.blob_manager, + metadata.source_blob_hash, metadata.validator.raw_info) - if metadata.metadata_source == StreamMetadata.FROM_BLOB: - yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash, - metadata.source_blob_hash) - lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, - metadata.source_blob_hash, - payment_rate_manager, - data_rate, - download_directory) + if file_name: + file_name = binascii.hexlify(file_name) + lbry_file = yield self.lbry_file_manager.add_downloaded_file( + stream_hash, metadata.source_blob_hash, binascii.hexlify(download_directory), payment_rate_manager, + data_rate, file_name=file_name + ) defer.returnValue(lbry_file) @staticmethod diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index a1ffc0da3..cf08be90c 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -1,9 +1,8 @@ """ Keep track of which LBRY Files are downloading and store their LBRY File specific metadata """ - -import logging import os +import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure @@ -12,7 +11,7 @@ from lbrynet.reflector.reupload import reflect_stream from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType, get_sd_info +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call @@ -30,38 +29,33 @@ class EncryptedFileManager(object): # when reflecting files, reflect up to this many files at a time CONCURRENT_REFLECTS = 5 - def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None): + def __init__(self, session, sd_identifier): self.auto_re_reflect = conf.settings['reflect_uploads'] self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval'] self.session = session - self.stream_info_manager = stream_info_manager + self.storage = session.storage # TODO: why is sd_identifier part of the file manager? self.sd_identifier = sd_identifier + assert sd_identifier self.lbry_files = [] - if download_directory: - self.download_directory = download_directory - else: - self.download_directory = os.getcwd() self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) - log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory)) @defer.inlineCallbacks def setup(self): - yield self.stream_info_manager.setup() yield self._add_to_sd_identifier() yield self._start_lbry_files() log.info("Started file manager") def get_lbry_file_status(self, lbry_file): - return self._get_lbry_file_status(lbry_file.rowid) + return self.session.storage.get_lbry_file_status(lbry_file.rowid) def set_lbry_file_data_payment_rate(self, lbry_file, new_rate): - return self._set_lbry_file_payment_rate(lbry_file.rowid, new_rate) + return self.session.storage(lbry_file.rowid, new_rate) def change_lbry_file_status(self, lbry_file, status): log.debug("Changing status of %s to %s", lbry_file.stream_hash, status) - return self._change_file_status(lbry_file.rowid, status) + return self.session.storage.change_file_status(lbry_file.rowid, status) def get_lbry_file_status_reports(self): ds = [] @@ -77,59 +71,55 @@ class EncryptedFileManager(object): dl.addCallback(filter_failures) return dl - def save_sd_blob_hash_to_stream(self, stream_hash, sd_hash): - return self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_hash) - def _add_to_sd_identifier(self): downloader_factory = ManagedEncryptedFileDownloaderFactory(self) self.sd_identifier.add_stream_downloader_factory( EncryptedFileStreamType, downloader_factory) def _get_lbry_file(self, rowid, stream_hash, payment_rate_manager, sd_hash, key, - stream_name, suggested_file_name, download_directory=None): - download_directory = download_directory or self.download_directory - payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager + stream_name, file_name, download_directory, suggested_file_name): return ManagedEncryptedFileDownloader( rowid, stream_hash, self.session.peer_finder, self.session.rate_limiter, self.session.blob_manager, - self.stream_info_manager, + self.session.storage, self, payment_rate_manager, self.session.wallet, download_directory, + file_name, + stream_name=stream_name, sd_hash=sd_hash, key=key, - stream_name=stream_name, suggested_file_name=suggested_file_name ) @defer.inlineCallbacks def _start_lbry_files(self): - files_and_options = yield self._get_all_lbry_files() - stream_infos = yield self.stream_info_manager._get_all_stream_infos() + files = yield self.session.storage.get_all_lbry_files() b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) - log.info("Trying to start %i files", len(stream_infos)) - for i, (rowid, stream_hash, blob_data_rate, status) in enumerate(files_and_options): - if len(files_and_options) > 500 and i % 500 == 0: - log.info("Started %i/%i files", i, len(stream_infos)) - if stream_hash in stream_infos: - lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, - stream_infos[stream_hash]['sd_hash'], - stream_infos[stream_hash]['key'], - stream_infos[stream_hash]['stream_name'], - stream_infos[stream_hash]['suggested_file_name']) - log.info("initialized file %s", lbry_file.stream_name) - try: - # restore will raise an Exception if status is unknown - lbry_file.restore(status) - self.lbry_files.append(lbry_file) - except Exception: - log.warning("Failed to start %i", rowid) - continue + + log.info("Trying to start %i files", len(files)) + for i, file_info in enumerate(files): + if len(files) > 500 and i % 500 == 0: + log.info("Started %i/%i files", i, len(files)) + + lbry_file = self._get_lbry_file( + file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], + file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], + file_info['suggested_file_name'] + ) + yield lbry_file.get_claim_info() + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.lbry_files.append(lbry_file) + except Exception: + log.warning("Failed to start %i", file_info['rowid']) + continue log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) @@ -157,17 +147,46 @@ class EncryptedFileManager(object): yield self._stop_lbry_file(lbry_file) @defer.inlineCallbacks - def add_lbry_file(self, stream_hash, sd_hash, payment_rate_manager=None, blob_data_rate=None, - download_directory=None, status=None): - rowid = yield self._save_lbry_file(stream_hash, blob_data_rate) - stream_metadata = yield get_sd_info(self.stream_info_manager, - stream_hash, False) + def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate): + status = ManagedEncryptedFileDownloader.STATUS_FINISHED + stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False) key = stream_metadata['key'] stream_name = stream_metadata['stream_name'] - suggested_file_name = stream_metadata['suggested_file_name'] - lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, sd_hash, key, - stream_name, suggested_file_name, download_directory) - lbry_file.restore(status or ManagedEncryptedFileDownloader.STATUS_STOPPED) + file_name = stream_metadata['suggested_file_name'] + rowid = yield self.storage.save_published_file( + stream_hash, file_name, download_directory, blob_data_rate, status + ) + lbry_file = self._get_lbry_file( + rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, + stream_metadata['suggested_file_name'] + ) + lbry_file.restore(status) + self.lbry_files.append(lbry_file) + defer.returnValue(lbry_file) + + @defer.inlineCallbacks + def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None, + blob_data_rate=None, status=None, file_name=None): + status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED + payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager + blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate + stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False) + key = stream_metadata['key'] + stream_name = stream_metadata['stream_name'] + file_name = file_name or stream_metadata['suggested_file_name'] + + # when we save the file we'll atomic touch the nearest file to the suggested file name + # that doesn't yet exist in the download directory + rowid = yield self.storage.save_downloaded_file( + stream_hash, os.path.basename(file_name.decode('hex')).encode('hex'), download_directory, blob_data_rate + ) + file_name = yield self.session.storage.get_filename_for_rowid(rowid) + lbry_file = self._get_lbry_file( + rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, + stream_metadata['suggested_file_name'] + ) + lbry_file.get_claim_info(include_supports=False) + lbry_file.restore(status) self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @@ -191,22 +210,8 @@ class EncryptedFileManager(object): self.lbry_files.remove(lbry_file) - yield self._delete_lbry_file_options(lbry_file.rowid) - yield lbry_file.delete_data() - - # TODO: delete this - # get count for stream hash returns the count of the lbry files with the stream hash - # in the lbry_file_options table, which will soon be removed. - - stream_count = yield self.get_count_for_stream_hash(lbry_file.stream_hash) - if stream_count == 0: - yield self.stream_info_manager.delete_stream(lbry_file.stream_hash) - else: - msg = ("Can't delete stream info for %s, count is %i\n" - "The call that resulted in this warning will\n" - "be removed in the database refactor") - log.warning(msg, lbry_file.stream_hash, stream_count) + yield self.session.storage.delete_stream(lbry_file.stream_hash) if delete_file and os.path.isfile(full_path): os.remove(full_path) @@ -234,30 +239,3 @@ class EncryptedFileManager(object): yield defer.DeferredList(list(self._stop_lbry_files())) log.info("Stopped encrypted file manager") defer.returnValue(True) - - def get_count_for_stream_hash(self, stream_hash): - return self._get_count_for_stream_hash(stream_hash) - - def _get_count_for_stream_hash(self, stream_hash): - return self.stream_info_manager._get_count_for_stream_hash(stream_hash) - - def _delete_lbry_file_options(self, rowid): - return self.stream_info_manager._delete_lbry_file_options(rowid) - - def _save_lbry_file(self, stream_hash, data_payment_rate): - return self.stream_info_manager._save_lbry_file(stream_hash, data_payment_rate) - - def _get_all_lbry_files(self): - return self.stream_info_manager._get_all_lbry_files() - - def _get_rowid_for_stream_hash(self, stream_hash): - return self.stream_info_manager._get_rowid_for_stream_hash(stream_hash) - - def _change_file_status(self, rowid, status): - return self.stream_info_manager._change_file_status(rowid, status) - - def _set_lbry_file_payment_rate(self, rowid, new_rate): - return self.stream_info_manager._set_lbry_file_payment_rate(rowid, new_rate) - - def _get_lbry_file_status(self, rowid): - return self.stream_info_manager._get_lbry_file_status(rowid) diff --git a/lbrynet/lbry_file/EncryptedFileMetadataManager.py b/lbrynet/lbry_file/EncryptedFileMetadataManager.py deleted file mode 100644 index be189fdb6..000000000 --- a/lbrynet/lbry_file/EncryptedFileMetadataManager.py +++ /dev/null @@ -1,378 +0,0 @@ -import os -import logging -import sqlite3 -from twisted.internet import defer -from twisted.python.failure import Failure -from twisted.enterprise import adbapi -from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash -from lbrynet.core.sqlite_helpers import rerun_if_locked -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader - -log = logging.getLogger(__name__) - - -class DBEncryptedFileMetadataManager(object): - """Store and provide access to LBRY file metadata using sqlite""" - - def __init__(self, db_dir, file_name=None): - self.db_dir = db_dir - self._db_file_name = file_name or "lbryfile_info.db" - self.db_conn = adbapi.ConnectionPool("sqlite3", os.path.join(self.db_dir, - self._db_file_name), - check_same_thread=False) - - def setup(self): - return self._open_db() - - def stop(self): - self.db_conn.close() - return defer.succeed(True) - - def get_all_streams(self): - return self._get_all_streams() - - def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs): - d = self._store_stream(stream_hash, file_name, key, suggested_file_name) - d.addCallback(lambda _: self.add_blobs_to_stream(stream_hash, blobs)) - return d - - def get_stream_info(self, stream_hash): - return self._get_stream_info(stream_hash) - - def check_if_stream_exists(self, stream_hash): - return self._check_if_stream_exists(stream_hash) - - def delete_stream(self, stream_hash): - return self._delete_stream(stream_hash) - - def add_blobs_to_stream(self, stream_hash, blobs): - return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True) - - def get_blobs_for_stream(self, stream_hash, start_blob=None, - end_blob=None, count=None, reverse=False): - log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count) - - def get_positions_of_start_and_end(): - if start_blob is not None: - d1 = self._get_blob_num_by_hash(stream_hash, start_blob) - else: - d1 = defer.succeed(None) - if end_blob is not None: - d2 = self._get_blob_num_by_hash(stream_hash, end_blob) - else: - d2 = defer.succeed(None) - - dl = defer.DeferredList([d1, d2]) - - def get_positions(results): - start_num = None - end_num = None - if results[0][0] is True: - start_num = results[0][1] - if results[1][0] is True: - end_num = results[1][1] - return start_num, end_num - - dl.addCallback(get_positions) - return dl - - def get_blob_infos(nums): - start_num, end_num = nums - return self._get_further_blob_infos(stream_hash, start_num, end_num, - count, reverse) - - d = get_positions_of_start_and_end() - d.addCallback(get_blob_infos) - return d - - def get_stream_of_blob(self, blob_hash): - return self._get_stream_of_blobhash(blob_hash) - - def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) - - def get_sd_blob_hashes_for_stream(self, stream_hash): - return self._get_sd_blob_hashes_for_stream(stream_hash) - - def get_stream_hash_for_sd_hash(self, sd_hash): - return self._get_stream_hash_for_sd_blob_hash(sd_hash) - - @staticmethod - def _create_tables(transaction): - transaction.execute("create table if not exists lbry_files (" + - " stream_hash text primary key, " + - " key text, " + - " stream_name text, " + - " suggested_file_name text" + - ")") - transaction.execute("create table if not exists lbry_file_blobs (" + - " blob_hash text, " + - " stream_hash text, " + - " position integer, " + - " iv text, " + - " length integer, " + - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")") - transaction.execute("create table if not exists lbry_file_descriptors (" + - " sd_blob_hash TEXT PRIMARY KEY, " + - " stream_hash TEXT, " + - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")") - transaction.execute("create table if not exists lbry_file_options (" + - " blob_data_rate real, " + - " status text," + - " stream_hash text," - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")") - transaction.execute("create table if not exists lbry_file_metadata (" + - " lbry_file integer primary key, " + - " txid text, " + - " n integer, " + - " foreign key(lbry_file) references lbry_files(rowid)" - ")") - - def _open_db(self): - # check_same_thread=False is solely to quiet a spurious error that appears to be due - # to a bug in twisted, where the connection is closed by a different thread than the - # one that opened it. The individual connections in the pool are not used in multiple - # threads. - return self.db_conn.runInteraction(self._create_tables) - - @rerun_if_locked - @defer.inlineCallbacks - def get_file_outpoint(self, rowid): - result = yield self.db_conn.runQuery("select txid, n from lbry_file_metadata " - "where lbry_file=?", (rowid, )) - response = None - if result: - txid, nout = result[0] - if txid is not None and nout is not None: - response = "%s:%i" % (txid, nout) - defer.returnValue(response) - - @rerun_if_locked - @defer.inlineCallbacks - def save_outpoint_to_file(self, rowid, txid, nout): - existing_outpoint = yield self.get_file_outpoint(rowid) - if not existing_outpoint: - yield self.db_conn.runOperation("insert into lbry_file_metadata values " - "(?, ?, ?)", (rowid, txid, nout)) - - @rerun_if_locked - def _delete_stream(self, stream_hash): - d = self.db_conn.runQuery( - "select rowid, stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) - d.addCallback( - lambda result: result[0] if result else Failure(NoSuchStreamHash(stream_hash))) - - def do_delete(transaction, row_id, s_h): - transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) - transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,)) - transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,)) - transaction.execute("delete from lbry_file_metadata where lbry_file = ?", (row_id,)) - - d.addCallback(lambda (row_id, s_h): self.db_conn.runInteraction(do_delete, row_id, s_h)) - return d - - @rerun_if_locked - def _store_stream(self, stream_hash, name, key, suggested_file_name): - d = self.db_conn.runQuery("insert into lbry_files values (?, ?, ?, ?)", - (stream_hash, key, name, suggested_file_name)) - - def check_duplicate(err): - if err.check(sqlite3.IntegrityError): - raise DuplicateStreamHashError(stream_hash) - return err - - d.addErrback(check_duplicate) - return d - - @rerun_if_locked - def _get_all_streams(self): - d = self.db_conn.runQuery("select stream_hash from lbry_files") - d.addCallback(lambda results: [r[0] for r in results]) - return d - - @rerun_if_locked - def _get_stream_info(self, stream_hash): - def get_result(res): - if res: - return res[0] - else: - raise NoSuchStreamHash(stream_hash) - - d = self.db_conn.runQuery( - "select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?", - (stream_hash,)) - d.addCallback(get_result) - return d - - @rerun_if_locked - @defer.inlineCallbacks - def _get_all_stream_infos(self): - file_results = yield self.db_conn.runQuery("select rowid, * from lbry_files") - descriptor_results = yield self.db_conn.runQuery("select stream_hash, sd_blob_hash " - "from lbry_file_descriptors") - response = {} - for (stream_hash, sd_hash) in descriptor_results: - if stream_hash in response: - log.warning("Duplicate stream %s (sd: %s)", stream_hash, sd_hash[:16]) - continue - response[stream_hash] = { - 'sd_hash': sd_hash - } - for (rowid, stream_hash, key, stream_name, suggested_file_name) in file_results: - if stream_hash not in response: - log.warning("Missing sd hash for %s", stream_hash) - continue - response[stream_hash]['rowid'] = rowid - response[stream_hash]['key'] = key - response[stream_hash]['stream_name'] = stream_name - response[stream_hash]['suggested_file_name'] = suggested_file_name - defer.returnValue(response) - - @rerun_if_locked - def _check_if_stream_exists(self, stream_hash): - d = self.db_conn.runQuery( - "select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) - d.addCallback(lambda r: True if len(r) else False) - return d - - @rerun_if_locked - def _get_blob_num_by_hash(self, stream_hash, blob_hash): - d = self.db_conn.runQuery( - "select position from lbry_file_blobs where stream_hash = ? and blob_hash = ?", - (stream_hash, blob_hash)) - d.addCallback(lambda r: r[0][0] if len(r) else None) - return d - - @rerun_if_locked - def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): - params = [] - q_string = "select * from (" - q_string += " select blob_hash, position, iv, length from lbry_file_blobs " - q_string += " where stream_hash = ? " - params.append(stream_hash) - if start_num is not None: - q_string += " and position > ? " - params.append(start_num) - if end_num is not None: - q_string += " and position < ? " - params.append(end_num) - q_string += " order by position " - if reverse is True: - q_string += " DESC " - if count is not None: - q_string += " limit ? " - params.append(count) - q_string += ") order by position" - # Order by position is done twice so that it always returns them from lowest position to - # greatest, but the limit by clause can select the 'count' greatest or 'count' least - return self.db_conn.runQuery(q_string, tuple(params)) - - @rerun_if_locked - def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False): - - def add_blobs(transaction): - for blob_info in blob_infos: - try: - transaction.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)", - (blob_info.blob_hash, stream_hash, blob_info.blob_num, - blob_info.iv, blob_info.length)) - except sqlite3.IntegrityError: - if ignore_duplicate_error is False: - raise - - return self.db_conn.runInteraction(add_blobs) - - @rerun_if_locked - def _get_stream_of_blobhash(self, blob_hash): - d = self.db_conn.runQuery("select stream_hash from lbry_file_blobs where blob_hash = ?", - (blob_hash,)) - d.addCallback(lambda r: r[0][0] if len(r) else None) - return d - - @rerun_if_locked - def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - d = self.db_conn.runOperation("insert or ignore into lbry_file_descriptors values (?, ?)", - (sd_blob_hash, stream_hash)) - d.addCallback(lambda _: log.info("Saved sd blob hash %s to stream hash %s", - str(sd_blob_hash), str(stream_hash))) - return d - - @rerun_if_locked - def _get_sd_blob_hashes_for_stream(self, stream_hash): - log.debug("Looking up sd blob hashes for stream hash %s", str(stream_hash)) - d = self.db_conn.runQuery( - "select sd_blob_hash from lbry_file_descriptors where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda results: [r[0] for r in results]) - return d - - @rerun_if_locked - def _get_stream_hash_for_sd_blob_hash(self, sd_blob_hash): - def _handle_result(result): - if not result: - raise NoSuchSDHash(sd_blob_hash) - return result[0][0] - - log.debug("Looking up sd blob hashes for sd blob hash %s", str(sd_blob_hash)) - d = self.db_conn.runQuery( - "select stream_hash from lbry_file_descriptors where sd_blob_hash = ?", - (sd_blob_hash,)) - d.addCallback(_handle_result) - return d - - # used by lbry file manager - @rerun_if_locked - def _save_lbry_file(self, stream_hash, data_payment_rate): - def do_save(db_transaction): - row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash) - db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row) - return db_transaction.lastrowid - return self.db_conn.runInteraction(do_save) - - @rerun_if_locked - def _delete_lbry_file_options(self, rowid): - return self.db_conn.runQuery("delete from lbry_file_options where rowid = ?", - (rowid,)) - - @rerun_if_locked - def _set_lbry_file_payment_rate(self, rowid, new_rate): - return self.db_conn.runQuery( - "update lbry_file_options set blob_data_rate = ? where rowid = ?", - (new_rate, rowid)) - - @rerun_if_locked - def _get_all_lbry_files(self): - d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate, status " - "from lbry_file_options") - return d - - @rerun_if_locked - def _change_file_status(self, rowid, new_status): - d = self.db_conn.runQuery("update lbry_file_options set status = ? where rowid = ?", - (new_status, rowid)) - d.addCallback(lambda _: new_status) - return d - - @rerun_if_locked - def _get_lbry_file_status(self, rowid): - d = self.db_conn.runQuery("select status from lbry_file_options where rowid = ?", - (rowid,)) - d.addCallback(lambda r: (r[0][0] if len(r) else None)) - return d - - @rerun_if_locked - def _get_count_for_stream_hash(self, stream_hash): - d = self.db_conn.runQuery("select count(*) from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda r: (r[0][0] if r else 0)) - return d - - @rerun_if_locked - def _get_rowid_for_stream_hash(self, stream_hash): - d = self.db_conn.runQuery("select rowid from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda r: (r[0][0] if len(r) else None)) - return d diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index 17b3cf501..9e5d218a0 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -2,10 +2,9 @@ import binascii from zope.interface import implements -from lbrynet.lbry_file.StreamDescriptor import save_sd_info +from lbrynet.core.StreamDescriptor import save_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager -from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler import os @@ -21,39 +20,21 @@ class EncryptedFileDownloader(CryptStreamDownloader): """Classes which inherit from this class download LBRY files""" def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, - stream_info_manager, payment_rate_manager, wallet, key, stream_name, - suggested_file_name=None): + storage, payment_rate_manager, wallet, key, stream_name, file_name): CryptStreamDownloader.__init__(self, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet, key, stream_name) self.stream_hash = stream_hash - self.stream_info_manager = stream_info_manager - self.suggested_file_name = binascii.unhexlify(suggested_file_name) + self.storage = storage + self.file_name = binascii.unhexlify(os.path.basename(file_name)) self._calculated_total_bytes = None + @defer.inlineCallbacks def delete_data(self): - d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - - def get_blob_hashes(blob_infos): - return [b[0] for b in blob_infos if b[0] is not None] - - d1.addCallback(get_blob_hashes) - d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) - - def combine_blob_hashes(results): - blob_hashes = [] - for success, result in results: - if success is True: - blob_hashes.extend(result) - return blob_hashes - - def delete_blobs(blob_hashes): - self.blob_manager.delete_blobs(blob_hashes) - return True - - dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) - dl.addCallback(combine_blob_hashes) - dl.addCallback(delete_blobs) - return dl + crypt_infos = yield self.storage.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b.blob_hash for b in crypt_infos if b.blob_hash] + sd_hash = yield self.storage.get_sd_blob_hash_for_stream(self.stream_hash) + blob_hashes.append(sd_hash) + yield self.blob_manager.delete_blobs(blob_hashes) def stop(self, err=None): d = self._close_output() @@ -76,10 +57,10 @@ class EncryptedFileDownloader(CryptStreamDownloader): pass def get_total_bytes(self): - d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + d = self.storage.get_blobs_for_stream(self.stream_hash) def calculate_size(blobs): - return sum([b[3] for b in blobs]) + return sum([b.length for b in blobs]) d.addCallback(calculate_size) return d @@ -106,18 +87,17 @@ class EncryptedFileDownloader(CryptStreamDownloader): def _get_metadata_handler(self, download_manager): return EncryptedFileMetadataHandler(self.stream_hash, - self.stream_info_manager, download_manager) + self.storage, download_manager) class EncryptedFileDownloaderFactory(object): implements(IStreamDownloaderFactory) - def __init__(self, peer_finder, rate_limiter, blob_manager, stream_info_manager, - wallet): + def __init__(self, peer_finder, rate_limiter, blob_manager, storage, wallet): self.peer_finder = peer_finder self.rate_limiter = rate_limiter self.blob_manager = blob_manager - self.stream_info_manager = stream_info_manager + self.storage = storage self.wallet = wallet def can_download(self, sd_validator): @@ -129,22 +109,14 @@ class EncryptedFileDownloaderFactory(object): payment_rate_manager.min_blob_data_payment_rate = data_rate def save_source_if_blob(stream_hash): - if metadata.metadata_source == StreamMetadata.FROM_BLOB: - d = self.stream_info_manager.save_sd_blob_hash_to_stream( - stream_hash, metadata.source_blob_hash) - else: - d = defer.succeed(True) - d.addCallback(lambda _: stream_hash) - return d + return defer.succeed(metadata.source_blob_hash) def create_downloader(stream_hash): downloader = self._make_downloader(stream_hash, payment_rate_manager, metadata.validator.raw_info) - d = downloader.set_stream_info() - d.addCallback(lambda _: downloader) - return d + return defer.succeed(downloader) - d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info) + d = save_sd_info(self.blob_manager, metadata.source_blob_hash, metadata.validator.raw_info) d.addCallback(save_source_if_blob) d.addCallback(create_downloader) return d @@ -154,26 +126,20 @@ class EncryptedFileDownloaderFactory(object): class EncryptedFileSaver(EncryptedFileDownloader): - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet, download_directory, key, stream_name, - suggested_file_name): + def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, wallet, + download_directory, key, stream_name, file_name): EncryptedFileDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, - blob_manager, stream_info_manager, payment_rate_manager, - wallet, key, stream_name, suggested_file_name) - self.download_directory = download_directory - self.file_name = os.path.basename(self.suggested_file_name) - self.file_written_to = None + blob_manager, storage, payment_rate_manager, + wallet, key, stream_name, file_name) + self.download_directory = binascii.unhexlify(download_directory) + self.file_written_to = os.path.join(self.download_directory, binascii.unhexlify(file_name)) self.file_handle = None def __str__(self): - if self.file_written_to is not None: - return str(self.file_written_to) - else: - return str(self.file_name) + return str(self.file_written_to) def stop(self, err=None): d = EncryptedFileDownloader.stop(self, err=err) - d.addCallback(lambda _: self._delete_from_info_manager()) return d def _get_progress_manager(self, download_manager): @@ -184,34 +150,16 @@ class EncryptedFileSaver(EncryptedFileDownloader): def _setup_output(self): def open_file(): if self.file_handle is None: - file_name = self.file_name - if not file_name: - file_name = "_" - if os.path.exists(os.path.join(self.download_directory, file_name)): - ext_num = 1 - - def _get_file_name(ext): - if len(file_name.split(".")): - fn = ''.join(file_name.split(".")[:-1]) - file_ext = ''.join(file_name.split(".")[-1]) - return fn + "-" + str(ext) + "." + file_ext - else: - return file_name + "_" + str(ext) - - while os.path.exists(os.path.join(self.download_directory, - _get_file_name(ext_num))): - ext_num += 1 - - file_name = _get_file_name(ext_num) + file_written_to = os.path.join(self.download_directory, self.file_name) try: - self.file_handle = open(os.path.join(self.download_directory, file_name), 'wb') - self.file_written_to = os.path.join(self.download_directory, file_name) + self.file_handle = open(file_written_to, 'wb') + self.file_written_to = file_written_to except IOError: log.error(traceback.format_exc()) raise ValueError( "Failed to open %s. Make sure you have permission to save files to that" - " location." % - os.path.join(self.download_directory, file_name)) + " location." % file_written_to + ) return threads.deferToThread(open_file) def _close_output(self): @@ -232,26 +180,20 @@ class EncryptedFileSaver(EncryptedFileDownloader): self.file_handle.write(data) return write_func - def _delete_from_info_manager(self): - return self.stream_info_manager.delete_stream(self.stream_hash) - class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory): - def __init__(self, peer_finder, rate_limiter, blob_manager, stream_info_manager, - wallet, download_directory): - EncryptedFileDownloaderFactory.__init__(self, peer_finder, rate_limiter, blob_manager, - stream_info_manager, wallet) - self.download_directory = download_directory + def __init__(self, peer_finder, rate_limiter, blob_manager, storage, wallet, download_directory): + EncryptedFileDownloaderFactory.__init__(self, peer_finder, rate_limiter, blob_manager, storage, wallet) + self.download_directory = binascii.hexlify(download_directory) def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): stream_name = stream_info.raw_info['stream_name'] key = stream_info.raw_info['key'] suggested_file_name = stream_info.raw_info['suggested_file_name'] - return EncryptedFileSaver(stream_hash, self.peer_finder, self.rate_limiter, - self.blob_manager, self.stream_info_manager, - payment_rate_manager, self.wallet, self.download_directory, - key=key, stream_name=stream_name, - suggested_file_name=suggested_file_name) + return EncryptedFileSaver( + stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.storage, payment_rate_manager, + self.wallet, self.download_directory, key=key, stream_name=stream_name, file_name=suggested_file_name + ) @staticmethod def get_description(): diff --git a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py index 116ac7080..51105c12b 100644 --- a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py +++ b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py @@ -1,7 +1,6 @@ import logging from zope.interface import implements from twisted.internet import defer -from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.interfaces import IMetadataHandler @@ -11,9 +10,9 @@ log = logging.getLogger(__name__) class EncryptedFileMetadataHandler(object): implements(IMetadataHandler) - def __init__(self, stream_hash, stream_info_manager, download_manager): + def __init__(self, stream_hash, storage, download_manager): self.stream_hash = stream_hash - self.stream_info_manager = stream_info_manager + self.storage = storage self.download_manager = download_manager self._final_blob_num = None @@ -21,7 +20,7 @@ class EncryptedFileMetadataHandler(object): @defer.inlineCallbacks def get_initial_blobs(self): - blob_infos = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + blob_infos = yield self.storage.get_blobs_for_stream(self.stream_hash) formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos) defer.returnValue(formatted_infos) @@ -32,12 +31,13 @@ class EncryptedFileMetadataHandler(object): def _format_initial_blobs_for_download_manager(self, blob_infos): infos = [] - for i, (blob_hash, blob_num, iv, length) in enumerate(blob_infos): - if blob_hash is not None and length: - infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv)) + for i, crypt_blob in enumerate(blob_infos): + if crypt_blob.blob_hash is not None and crypt_blob.length: + infos.append(crypt_blob) else: if i != len(blob_infos) - 1: - raise Exception("Invalid stream terminator") - log.debug("Setting _final_blob_num to %s", str(blob_num - 1)) - self._final_blob_num = blob_num - 1 + raise Exception("Invalid stream terminator: %i of %i" % + (i, len(blob_infos) - 1)) + log.debug("Setting _final_blob_num to %s", str(crypt_blob.blob_num - 1)) + self._final_blob_num = crypt_blob.blob_num - 1 return infos