diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f11b5f30..bd6f2f03b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ at anytime. * `get` failing with a non-useful error message when given a uri for a channel claim * exception checking in several wallet unit tests * daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method - * + * incorrect `blob_num` for the stream terminator blob, which would result in creating invalid streams (https://github.com/lbryio/lbry/issues/1124) ### Deprecated * `channel_list_mine`, replaced with `channel_list` diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index f9495bec4..44a760bf8 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -1,4 +1,3 @@ -import os import binascii from collections import defaultdict import json @@ -335,25 +334,6 @@ def get_sd_info(storage, stream_hash, include_blobs): ) -@defer.inlineCallbacks -def create_plain_sd(storage, stream_hash, file_name, overwrite_existing=False): - def _get_file_name(): - actual_file_name = file_name - if os.path.exists(actual_file_name): - ext_num = 1 - while os.path.exists(actual_file_name + "_" + str(ext_num)): - ext_num += 1 - actual_file_name = actual_file_name + "_" + str(ext_num) - return actual_file_name - - if overwrite_existing is False: - file_name = yield threads.deferToThread(_get_file_name()) - descriptor_writer = PlainStreamDescriptorWriter(file_name) - sd_info = yield get_sd_info(storage, stream_hash, True) - sd_hash = yield descriptor_writer.create_descriptor(sd_info) - defer.returnValue(sd_hash) - - def get_blob_hashsum(b): length = b['length'] if length != 0: @@ -377,13 +357,14 @@ def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos): h.update(key) h.update(hex_suggested_file_name) blobs_hashsum = get_lbry_hash_obj() - sorted_blob_infos = sorted(blob_infos, key=lambda x: x['blob_num']) - for blob in sorted_blob_infos: - blobs_hashsum.update(get_blob_hashsum(blob)) - if sorted_blob_infos[-1]['length'] != 0: + if any(blob['length'] for blob in blob_infos if blob['length'] <= 0): + raise InvalidStreamDescriptorError("Contains invalid length data blobs") + if blob_infos[-1]['length'] != 0: raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") - if 'blob_hash' in sorted_blob_infos[-1]: + if 'blob_hash' in blob_infos[-1]: raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") + for blob in blob_infos: + blobs_hashsum.update(get_blob_hashsum(blob)) h.update(blobs_hashsum.digest()) return h.hexdigest() diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index e9a380ed7..e39a50c1d 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -86,10 +86,9 @@ class CryptStreamCreator(object): self.stopped = True if self.current_blob is not None: self._close_current_blob() - self._finalize() - dl = defer.DeferredList(self.finished_deferreds) - dl.addCallback(lambda _: self._finished()) - return dl + d = self._finalize() + d.addCallback(lambda _: self._finished()) + return d # TODO: move the stream creation process to its own thread and # remove the reactor from this process. @@ -112,6 +111,7 @@ class CryptStreamCreator(object): return defer.succeed(True) + @defer.inlineCallbacks def _finalize(self): """ Finalize a stream by adding an empty @@ -119,14 +119,14 @@ class CryptStreamCreator(object): the stream has ended. This empty blob is not saved to the blob manager """ - log.debug("_finalize has been called") + + yield defer.DeferredList(self.finished_deferreds) self.blob_count += 1 iv = self.iv_generator.next() - final_blob_creator = self.blob_manager.get_blob_creator() - final_blob = self._get_blob_maker(iv, final_blob_creator) - d = final_blob.close() - d.addCallback(self._blob_finished) - self.finished_deferreds.append(d) + final_blob = self._get_blob_maker(iv, self.blob_manager.get_blob_creator()) + stream_terminator = yield final_blob.close() + terminator_info = yield self._blob_finished(stream_terminator) + defer.returnValue(terminator_info) def _write(self, data): while len(data) > 0: diff --git a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py index 4ebf36f4f..05c0a8feb 100644 --- a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py +++ b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py @@ -5,6 +5,7 @@ from twisted.trial import unittest from twisted.internet import defer from lbrynet.database.storage import SQLiteStorage +from lbrynet.core.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader from lbrynet.core import BlobManager from lbrynet.core import Session from lbrynet.core.server import DHTHashAnnouncer @@ -15,6 +16,7 @@ from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir MB = 2**20 + def iv_generator(): while True: yield '3' * AES.block_size @@ -22,6 +24,7 @@ def iv_generator(): class CreateEncryptedFileTest(unittest.TestCase): timeout = 5 + @defer.inlineCallbacks def setUp(self): mocks.mock_conf_settings(self) @@ -57,16 +60,28 @@ class CreateEncryptedFileTest(unittest.TestCase): def test_can_create_file(self): expected_stream_hash = "41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c35" \ "7b1acb742dd83151fb66393a7709e9f346260a4f4db6de10c25" - expected_sd_hash = "bc435ae0c4659635e6514e05bb1fcd0d365b234f6f0e78002" \ - "d2576ff84a0b8710a9847757a9aa8cbeda5a8e1aeafa48b" + expected_sd_hash = "db043b44384c149126685990f6bb6563aa565ae331303d522" \ + "c8728fe0534dd06fbcacae92b0891787ad9b68ffc8d20c1" filename = 'test.file' lbry_file = yield self.create_file(filename) sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash) + # read the sd blob file + sd_blob = self.blob_manager.blobs[sd_hash] + sd_reader = BlobStreamDescriptorReader(sd_blob) + sd_file_info = yield sd_reader.get_info() + + # this comes from the database, the blobs returned are sorted + sd_info = yield get_sd_info(self.session.storage, lbry_file.stream_hash, include_blobs=True) + self.assertDictEqual(sd_info, sd_file_info) + self.assertEqual(sd_info['stream_hash'], expected_stream_hash) + self.assertEqual(len(sd_info['blobs']), 3) + self.assertNotEqual(sd_info['blobs'][0]['length'], 0) + self.assertNotEqual(sd_info['blobs'][1]['length'], 0) + self.assertEqual(sd_info['blobs'][2]['length'], 0) self.assertEqual(expected_stream_hash, lbry_file.stream_hash) self.assertEqual(sd_hash, lbry_file.sd_hash) self.assertEqual(sd_hash, expected_sd_hash) - blobs = yield self.blob_manager.get_all_verified_blobs() self.assertEqual(3, len(blobs)) num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs()