diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 38d83c71e..54aa2cd13 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -47,7 +47,7 @@ class DiskBlobManager(DHTHashSupplier): return self._make_new_blob(blob_hash, length) def get_blob_creator(self): - return self.blob_creator_type(self, self.blob_dir) + return self.blob_creator_type(self.blob_dir) def _make_new_blob(self, blob_hash, length=None): log.debug('Making a new blob for %s', blob_hash) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 4d1521d0b..d80cede96 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -347,8 +347,7 @@ class TempBlob(HashBlob): class HashBlobCreator(object): - def __init__(self, blob_manager): - self.blob_manager = blob_manager + def __init__(self): self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 self.blob_hash = None @@ -365,7 +364,6 @@ class HashBlobCreator(object): self.blob_hash = self._hashsum.hexdigest() d = self._close() if self.blob_hash is not None: - d.addCallback(lambda _: self.blob_manager.creator_finished(self)) d.addCallback(lambda _: self.blob_hash) else: d.addCallback(lambda _: None) @@ -384,8 +382,8 @@ class HashBlobCreator(object): class BlobFileCreator(HashBlobCreator): - def __init__(self, blob_manager, blob_dir): - HashBlobCreator.__init__(self, blob_manager) + def __init__(self, blob_dir): + HashBlobCreator.__init__(self) self.blob_dir = blob_dir self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) @@ -403,8 +401,8 @@ class BlobFileCreator(HashBlobCreator): class TempBlobCreator(HashBlobCreator): - def __init__(self, blob_manager): - HashBlobCreator.__init__(self, blob_manager) + def __init__(self): + HashBlobCreator.__init__(self) # TODO: use StringIO self.data_buffer = '' diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 8c12afe39..4f1d95d26 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -1,7 +1,7 @@ from collections import defaultdict import json import logging -from twisted.internet import threads +from twisted.internet import threads, defer from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError @@ -101,12 +101,15 @@ class BlobStreamDescriptorWriter(StreamDescriptorWriter): self.blob_manager = blob_manager + @defer.inlineCallbacks def _write_stream_descriptor(self, raw_data): log.debug("Creating the new blob for the stream descriptor") blob_creator = self.blob_manager.get_blob_creator() blob_creator.write(raw_data) log.debug("Wrote the data to the new blob") - return blob_creator.close() + sd_hash = yield blob_creator.close() + yield self.blob_manager.creator_finished(blob_creator) + defer.returnValue(sd_hash) class StreamMetadata(object): diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index 5993fc2f8..c256c0c75 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -61,18 +61,20 @@ class CryptStreamCreator(StreamCreator): return defer.succeed(True) def _finalize(self): + """ + Finalize a stream by adding an empty + blob at the end, this is to indicate that + the stream has ended. This empty blob is not + saved to the blob manager + """ log.debug("_finalize has been called") self.blob_count += 1 iv = self.iv_generator.next() final_blob_creator = self.blob_manager.get_blob_creator() - log.debug("Created the finished_deferred") final_blob = self._get_blob_maker(iv, final_blob_creator) - log.debug("Created the final blob") - log.debug("Calling close on final blob") d = final_blob.close() d.addCallback(self._blob_finished) self.finished_deferreds.append(d) - log.debug("called close on final blob, returning from make_final_blob") def _write(self, data): def close_blob(blob): @@ -82,14 +84,17 @@ class CryptStreamCreator(StreamCreator): while len(data) > 0: if self.current_blob is None: - next_blob_creator = self.blob_manager.get_blob_creator() + self.next_blob_creator = self.blob_manager.get_blob_creator() self.blob_count += 1 iv = self.iv_generator.next() - self.current_blob = self._get_blob_maker(iv, next_blob_creator) + self.current_blob = self._get_blob_maker(iv, self.next_blob_creator) done, num_bytes_written = self.current_blob.write(data) data = data[num_bytes_written:] if done is True: - close_blob(self.current_blob) + d = self.current_blob.close() + d.addCallback(self._blob_finished) + d.addCallback(lambda _: self.blob_manager.creator_finished(self.next_blob_creator)) + self.finished_deferreds.append(d) self.current_blob = None def _get_blob_maker(self, iv, blob_creator): diff --git a/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py b/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py index cfed214f4..3604ea503 100644 --- a/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py +++ b/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py @@ -12,7 +12,6 @@ from lbrynet.core import Session from lbrynet.core.server import DHTHashAnnouncer from lbrynet.file_manager import EncryptedFileCreator from lbrynet.file_manager import EncryptedFileManager - from tests import mocks from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir