From f8c33b6acbc3429154e517e281a07aadb3f5bc6b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 12 Feb 2018 13:55:35 -0500 Subject: [PATCH] refactor lbrynet.core.StreamDescriptor -remove lbrynet/lbry_file/StreamDescriptor.py --- lbrynet/core/StreamDescriptor.py | 218 +++++++++++++++++++++++++- lbrynet/lbry_file/StreamDescriptor.py | 185 ---------------------- lbrynet/lbry_file/__init__.py | 2 - 3 files changed, 215 insertions(+), 190 deletions(-) delete mode 100644 lbrynet/lbry_file/StreamDescriptor.py diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index d0bd28310..f9495bec4 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -1,7 +1,11 @@ +import os +import binascii from collections import defaultdict import json import logging + from twisted.internet import threads, defer +from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError @@ -87,7 +91,7 @@ class PlainStreamDescriptorWriter(StreamDescriptorWriter): def _write_stream_descriptor(self, raw_data): def write_file(): - log.debug("Writing the sd file to disk") + log.info("Writing the sd file to disk") with open(self.sd_file_name, 'w') as sd_file: sd_file.write(raw_data) return self.sd_file_name @@ -98,7 +102,6 @@ class PlainStreamDescriptorWriter(StreamDescriptorWriter): class BlobStreamDescriptorWriter(StreamDescriptorWriter): def __init__(self, blob_manager): StreamDescriptorWriter.__init__(self) - self.blob_manager = blob_manager @defer.inlineCallbacks @@ -239,6 +242,208 @@ class StreamDescriptorIdentifier(object): return d +EncryptedFileStreamType = "lbryfile" + + +@defer.inlineCallbacks +def save_sd_info(blob_manager, sd_hash, sd_info): + if not blob_manager.blobs.get(sd_hash) or not blob_manager.blobs[sd_hash].get_is_verified(): + descriptor_writer = BlobStreamDescriptorWriter(blob_manager) + calculated_sd_hash = yield descriptor_writer.create_descriptor(sd_info) + if calculated_sd_hash != sd_hash: + raise InvalidStreamDescriptorError("%s does not match calculated %s" % + (sd_hash, calculated_sd_hash)) + stream_hash = yield blob_manager.storage.get_stream_hash_for_sd_hash(sd_hash) + if not stream_hash: + log.debug("Saving info for %s", sd_info['stream_name'].decode('hex')) + stream_name = sd_info['stream_name'] + key = sd_info['key'] + stream_hash = sd_info['stream_hash'] + stream_blobs = sd_info['blobs'] + suggested_file_name = sd_info['suggested_file_name'] + yield blob_manager.storage.add_known_blobs(stream_blobs) + yield blob_manager.storage.store_stream( + stream_hash, sd_hash, stream_name, key, suggested_file_name, stream_blobs + ) + defer.returnValue(stream_hash) + + +def format_blobs(crypt_blob_infos): + formatted_blobs = [] + for blob_info in crypt_blob_infos: + blob = {} + if blob_info.length != 0: + blob['blob_hash'] = str(blob_info.blob_hash) + blob['blob_num'] = blob_info.blob_num + blob['iv'] = str(blob_info.iv) + blob['length'] = blob_info.length + formatted_blobs.append(blob) + return formatted_blobs + + +def format_sd_info(stream_type, stream_name, key, suggested_file_name, stream_hash, blobs): + return { + "stream_type": stream_type, + "stream_name": stream_name, + "key": key, + "suggested_file_name": suggested_file_name, + "stream_hash": stream_hash, + "blobs": blobs + } + + +@defer.inlineCallbacks +def get_sd_info(storage, stream_hash, include_blobs): + """ + Get an sd info dictionary from storage + + :param storage: (SQLiteStorage) storage instance + :param stream_hash: (str) stream hash + :param include_blobs: (bool) include stream blob infos + + :return: { + "stream_type": "lbryfile", + "stream_name": , + "key": , + "suggested_file_name": , + "stream_hash": , + "blobs": [ + { + "blob_hash": , + "blob_num": 0, + "iv": , + "length": + }, ... + { + "blob_num": , + "iv": , + "length": 0 + } + ] + } + """ + + stream_info = yield storage.get_stream_info(stream_hash) + blobs = [] + if include_blobs: + blobs = yield storage.get_blobs_for_stream(stream_hash) + defer.returnValue( + format_sd_info( + EncryptedFileStreamType, stream_info[0], stream_info[1], + stream_info[2], stream_hash, format_blobs(blobs) + ) + ) + + +@defer.inlineCallbacks +def create_plain_sd(storage, stream_hash, file_name, overwrite_existing=False): + def _get_file_name(): + actual_file_name = file_name + if os.path.exists(actual_file_name): + ext_num = 1 + while os.path.exists(actual_file_name + "_" + str(ext_num)): + ext_num += 1 + actual_file_name = actual_file_name + "_" + str(ext_num) + return actual_file_name + + if overwrite_existing is False: + file_name = yield threads.deferToThread(_get_file_name()) + descriptor_writer = PlainStreamDescriptorWriter(file_name) + sd_info = yield get_sd_info(storage, stream_hash, True) + sd_hash = yield descriptor_writer.create_descriptor(sd_info) + defer.returnValue(sd_hash) + + +def get_blob_hashsum(b): + length = b['length'] + if length != 0: + blob_hash = b['blob_hash'] + else: + blob_hash = None + blob_num = b['blob_num'] + iv = b['iv'] + blob_hashsum = get_lbry_hash_obj() + if length != 0: + blob_hashsum.update(blob_hash) + blob_hashsum.update(str(blob_num)) + blob_hashsum.update(iv) + blob_hashsum.update(str(length)) + return blob_hashsum.digest() + + +def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos): + h = get_lbry_hash_obj() + h.update(hex_stream_name) + h.update(key) + h.update(hex_suggested_file_name) + blobs_hashsum = get_lbry_hash_obj() + sorted_blob_infos = sorted(blob_infos, key=lambda x: x['blob_num']) + for blob in sorted_blob_infos: + blobs_hashsum.update(get_blob_hashsum(blob)) + if sorted_blob_infos[-1]['length'] != 0: + raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") + if 'blob_hash' in sorted_blob_infos[-1]: + raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") + h.update(blobs_hashsum.digest()) + return h.hexdigest() + + +def verify_hex(text, field_name): + for c in text: + if c not in '0123456789abcdef': + raise InvalidStreamDescriptorError("%s is not a hex-encoded string" % field_name) + + +def validate_descriptor(stream_info): + try: + hex_stream_name = stream_info['stream_name'] + key = stream_info['key'] + hex_suggested_file_name = stream_info['suggested_file_name'] + stream_hash = stream_info['stream_hash'] + blobs = stream_info['blobs'] + except KeyError as e: + raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0])) + + verify_hex(key, "key") + verify_hex(hex_suggested_file_name, "suggested file name") + verify_hex(stream_hash, "stream_hash") + + calculated_stream_hash = get_stream_hash( + hex_stream_name, key, hex_suggested_file_name, blobs + ) + if calculated_stream_hash != stream_hash: + raise InvalidStreamDescriptorError("Stream hash does not match stream metadata") + return True + + +class EncryptedFileStreamDescriptorValidator(object): + def __init__(self, raw_info): + self.raw_info = raw_info + + def validate(self): + return defer.succeed(validate_descriptor(self.raw_info)) + + def info_to_show(self): + info = [] + info.append(("stream_name", binascii.unhexlify(self.raw_info.get("stream_name")))) + size_so_far = 0 + for blob_info in self.raw_info.get("blobs", []): + size_so_far += int(blob_info['length']) + info.append(("stream_size", str(self.get_length_of_stream()))) + suggested_file_name = self.raw_info.get("suggested_file_name", None) + if suggested_file_name is not None: + suggested_file_name = binascii.unhexlify(suggested_file_name) + info.append(("suggested_file_name", suggested_file_name)) + return info + + def get_length_of_stream(self): + size_so_far = 0 + for blob_info in self.raw_info.get("blobs", []): + size_so_far += int(blob_info['length']) + return size_so_far + + +@defer.inlineCallbacks def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): """ Downloads a single blob from the network @@ -251,6 +456,7 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): @return: An object of type HashBlob """ + downloader = StandaloneBlobDownloader(blob_hash, session.blob_manager, session.peer_finder, @@ -258,4 +464,10 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): payment_rate_manager, session.wallet, timeout) - return downloader.download() + sd_blob = yield downloader.download() + sd_reader = BlobStreamDescriptorReader(sd_blob) + sd_info = yield sd_reader.get_info() + raw_sd = yield sd_reader._get_raw_data() + yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)) + yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info) + defer.returnValue(sd_blob) diff --git a/lbrynet/lbry_file/StreamDescriptor.py b/lbrynet/lbry_file/StreamDescriptor.py deleted file mode 100644 index a114acc5f..000000000 --- a/lbrynet/lbry_file/StreamDescriptor.py +++ /dev/null @@ -1,185 +0,0 @@ -import binascii -import logging -from lbrynet.core.cryptoutils import get_lbry_hash_obj -from lbrynet.cryptstream.CryptBlob import CryptBlobInfo -from twisted.internet import defer, threads -from lbrynet.core.Error import DuplicateStreamHashError, InvalidStreamDescriptorError -from lbrynet.core.StreamDescriptor import PlainStreamDescriptorWriter, BlobStreamDescriptorWriter -import os - - -log = logging.getLogger(__name__) - - -EncryptedFileStreamType = "lbryfile" - - -def save_sd_info(stream_info_manager, sd_info, ignore_duplicate=False): - log.debug("Saving info for %s", str(sd_info['stream_name'])) - hex_stream_name = sd_info['stream_name'] - key = sd_info['key'] - stream_hash = sd_info['stream_hash'] - raw_blobs = sd_info['blobs'] - suggested_file_name = sd_info['suggested_file_name'] - crypt_blobs = [] - for blob in raw_blobs: - length = blob['length'] - if length != 0: - blob_hash = blob['blob_hash'] - else: - blob_hash = None - blob_num = blob['blob_num'] - iv = blob['iv'] - crypt_blobs.append(CryptBlobInfo(blob_hash, blob_num, length, iv)) - log.debug("Trying to save stream info for %s", str(hex_stream_name)) - d = stream_info_manager.save_stream(stream_hash, hex_stream_name, key, - suggested_file_name, crypt_blobs) - - def check_if_duplicate(err): - if ignore_duplicate is True: - err.trap(DuplicateStreamHashError) - - d.addErrback(check_if_duplicate) - - d.addCallback(lambda _: stream_hash) - return d - - -def get_sd_info(stream_info_manager, stream_hash, include_blobs): - d = stream_info_manager.get_stream_info(stream_hash) - - def format_info(stream_info): - fields = {} - fields['stream_type'] = EncryptedFileStreamType - fields['stream_name'] = stream_info[1] - fields['key'] = stream_info[0] - fields['suggested_file_name'] = stream_info[2] - fields['stream_hash'] = stream_hash - - def format_blobs(blobs): - formatted_blobs = [] - for blob_hash, blob_num, iv, length in blobs: - blob = {} - if length != 0: - blob['blob_hash'] = blob_hash - blob['blob_num'] = blob_num - blob['iv'] = iv - blob['length'] = length - formatted_blobs.append(blob) - fields['blobs'] = formatted_blobs - return fields - - if include_blobs is True: - d = stream_info_manager.get_blobs_for_stream(stream_hash) - else: - d = defer.succeed([]) - d.addCallback(format_blobs) - return d - - d.addCallback(format_info) - return d - - -@defer.inlineCallbacks -def publish_sd_blob(stream_info_manager, blob_manager, stream_hash): - descriptor_writer = BlobStreamDescriptorWriter(blob_manager) - sd_info = yield get_sd_info(stream_info_manager, stream_hash, True) - sd_blob_hash = yield descriptor_writer.create_descriptor(sd_info) - yield stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) - defer.returnValue(sd_blob_hash) - - -def create_plain_sd(stream_info_manager, stream_hash, file_name, overwrite_existing=False): - - def _get_file_name(): - actual_file_name = file_name - if os.path.exists(actual_file_name): - ext_num = 1 - while os.path.exists(actual_file_name + "_" + str(ext_num)): - ext_num += 1 - actual_file_name = actual_file_name + "_" + str(ext_num) - return actual_file_name - - if overwrite_existing is False: - d = threads.deferToThread(_get_file_name()) - else: - d = defer.succeed(file_name) - - def do_create(file_name): - descriptor_writer = PlainStreamDescriptorWriter(file_name) - d = get_sd_info(stream_info_manager, stream_hash, True) - d.addCallback(descriptor_writer.create_descriptor) - return d - - d.addCallback(do_create) - return d - - -class EncryptedFileStreamDescriptorValidator(object): - def __init__(self, raw_info): - self.raw_info = raw_info - - def validate(self): - log.debug("Trying to validate stream descriptor for %s", str(self.raw_info['stream_name'])) - try: - hex_stream_name = self.raw_info['stream_name'] - key = self.raw_info['key'] - hex_suggested_file_name = self.raw_info['suggested_file_name'] - stream_hash = self.raw_info['stream_hash'] - blobs = self.raw_info['blobs'] - except KeyError as e: - raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0])) - for c in hex_suggested_file_name: - if c not in '0123456789abcdef': - raise InvalidStreamDescriptorError( - "Suggested file name is not a hex-encoded string") - h = get_lbry_hash_obj() - h.update(hex_stream_name) - h.update(key) - h.update(hex_suggested_file_name) - - def get_blob_hashsum(b): - length = b['length'] - if length != 0: - blob_hash = b['blob_hash'] - else: - blob_hash = None - blob_num = b['blob_num'] - iv = b['iv'] - blob_hashsum = get_lbry_hash_obj() - if length != 0: - blob_hashsum.update(blob_hash) - blob_hashsum.update(str(blob_num)) - blob_hashsum.update(iv) - blob_hashsum.update(str(length)) - return blob_hashsum.digest() - - blobs_hashsum = get_lbry_hash_obj() - for blob in blobs: - blobs_hashsum.update(get_blob_hashsum(blob)) - if blobs[-1]['length'] != 0: - raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") - h.update(blobs_hashsum.digest()) - if h.hexdigest() != stream_hash: - raise InvalidStreamDescriptorError("Stream hash does not match stream metadata") - log.debug("It is validated") - return defer.succeed(True) - - def info_to_show(self): - info = [] - info.append(("stream_name", binascii.unhexlify(self.raw_info.get("stream_name")))) - size_so_far = 0 - for blob_info in self.raw_info.get("blobs", []): - size_so_far += int(blob_info['length']) - info.append(("stream_size", str(self.get_length_of_stream()))) - suggested_file_name = self.raw_info.get("suggested_file_name", None) - if suggested_file_name is not None: - suggested_file_name = binascii.unhexlify(suggested_file_name) - info.append(("suggested_file_name", suggested_file_name)) - return info - - def get_length_of_stream(self): - size_so_far = 0 - for blob_info in self.raw_info.get("blobs", []): - size_so_far += int(blob_info['length']) - return size_so_far diff --git a/lbrynet/lbry_file/__init__.py b/lbrynet/lbry_file/__init__.py index a073d3403..e69de29bb 100644 --- a/lbrynet/lbry_file/__init__.py +++ b/lbrynet/lbry_file/__init__.py @@ -1,2 +0,0 @@ -from lbrynet.lbry_file.StreamDescriptor import get_sd_info -from lbrynet.lbry_file.StreamDescriptor import publish_sd_blob