diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 0a90dea2f..ac13c6d61 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -1,10 +1,7 @@ -from StringIO import StringIO from io import BytesIO import logging import os -import tempfile import threading -import shutil from twisted.internet import interfaces, defer, threads from twisted.protocols.basic import FileSender from twisted.web.client import FileBodyProducer @@ -120,7 +117,6 @@ class HashBlob(object): return False def read(self, write_func): - def close_self(*args): self.close_read_handle(file_handle) return args[0] @@ -182,22 +178,22 @@ class HashBlob(object): return d def open_for_writing(self, peer): - pass + raise NotImplementedError() def open_for_reading(self): - pass + raise NotImplementedError() def delete(self): - pass + raise NotImplementedError() def close_read_handle(self, file_handle): - pass + raise NotImplementedError() def _close_writer(self, writer): - pass + raise NotImplementedError() def _save_verified_blob(self, writer): - pass + raise NotImplementedError() def __str__(self): return self.blob_hash[:16] @@ -209,12 +205,13 @@ class HashBlob(object): class BlobFile(HashBlob): """A HashBlob which will be saved to the hard disk of the downloader""" - def __init__(self, blob_dir, *args): - HashBlob.__init__(self, *args) + def __init__(self, blob_dir, blob_hash, length=None): + HashBlob.__init__(self, blob_hash, length) self.blob_dir = blob_dir self.file_path = os.path.join(blob_dir, self.blob_hash) self.setting_verified_blob_lock = threading.Lock() self.moved_verified_blob = False + self.buffer = BytesIO() if os.path.isfile(self.file_path): self.set_length(os.path.getsize(self.file_path)) # This assumes that the hash of the blob has already been @@ -227,10 +224,8 @@ class BlobFile(HashBlob): def open_for_writing(self, peer): if not peer in self.writers: log.debug("Opening %s to be written by %s", str(self), str(peer)) - write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) finished_deferred = defer.Deferred() - writer = HashBlobWriter(write_file, self.get_length, self.writer_finished) - + writer = HashBlobWriter(self.buffer, self.get_length, self.writer_finished) self.writers[peer] = (writer, finished_deferred) return finished_deferred, writer.write, writer.cancel log.warning("Tried to download the same file twice simultaneously from the same peer") @@ -278,137 +273,48 @@ class BlobFile(HashBlob): def _close_writer(self, writer): if writer.write_handle is not None: log.debug("Closing %s", str(self)) - name = writer.write_handle.name writer.write_handle.close() - threads.deferToThread(os.remove, name) writer.write_handle = None + @defer.inlineCallbacks def _save_verified_blob(self, writer): - - def move_file(): - with self.setting_verified_blob_lock: - if self.moved_verified_blob is False: - temp_file_name = writer.write_handle.name - writer.write_handle.close() - shutil.move(temp_file_name, self.file_path) - writer.write_handle = None - self.moved_verified_blob = True - return True - else: - raise DownloadCanceledError() - - return threads.deferToThread(move_file) + with self.setting_verified_blob_lock: + if self.moved_verified_blob is False: + self.buffer.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(self.buffer) + yield producer.startProducing(open(out_path, 'wb')) + self.moved_verified_blob = True + defer.returnValue(True) + else: + raise DownloadCanceledError() -class TempBlob(HashBlob): - """A HashBlob which will only exist in memory""" - def __init__(self, *args): - HashBlob.__init__(self, *args) - self.data_buffer = "" - - def open_for_writing(self, peer): - if not peer in self.writers: - temp_buffer = StringIO() - finished_deferred = defer.Deferred() - writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished) - - self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.cancel - return None, None, None - - def open_for_reading(self): - if self._verified is True: - return StringIO(self.data_buffer) - return None - - def delete(self): - if not self.writers and not self.readers: - self._verified = False - self.data_buffer = '' - return defer.succeed(True) - else: - return defer.fail(Failure( - ValueError("Blob is currently being read or written and cannot be deleted"))) - - def close_read_handle(self, file_handle): - file_handle.close() - - def _close_writer(self, writer): - if writer.write_handle is not None: - writer.write_handle.close() - writer.write_handle = None - - def _save_verified_blob(self, writer): - if not self.data_buffer: - self.data_buffer = writer.write_handle.getvalue() - writer.write_handle.close() - writer.write_handle = None - return defer.succeed(True) - else: - return defer.fail(Failure(DownloadCanceledError())) - - -class HashBlobCreator(object): - def __init__(self): +class BlobFileCreator(object): + def __init__(self, blob_dir): + self.blob_dir = blob_dir + self.buffer = BytesIO() + self._is_open = True self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 self.blob_hash = None self.length = None - def open(self): - pass - + @defer.inlineCallbacks def close(self): self.length = self.len_so_far - if self.length == 0: - self.blob_hash = None - else: - self.blob_hash = self._hashsum.hexdigest() - d = self._close() - if self.blob_hash is not None: - d.addCallback(lambda _: self.blob_hash) - else: - d.addCallback(lambda _: None) - return d - - def write(self, data): - self._hashsum.update(data) - self.len_so_far += len(data) - self._write(data) - - def _close(self): - pass - - def _write(self, data): - pass - - -class BlobFileCreator(HashBlobCreator): - def __init__(self, blob_dir): - HashBlobCreator.__init__(self) - self.blob_dir = blob_dir - self.buffer = BytesIO() - - def _close(self): - if self.blob_hash is not None: + self.blob_hash = self._hashsum.hexdigest() + if self.blob_hash and self._is_open: self.buffer.seek(0) out_path = os.path.join(self.blob_dir, self.blob_hash) producer = FileBodyProducer(self.buffer) - return producer.startProducing(open(out_path, 'wb')) - return defer.succeed(True) + yield producer.startProducing(open(out_path, 'wb')) + self._is_open = False + defer.returnValue(self.blob_hash) - def _write(self, data): + def write(self, data): + if not self._is_open: + raise IOError + self._hashsum.update(data) + self.len_so_far += len(data) self.buffer.write(data) - - -class TempBlobCreator(HashBlobCreator): - def __init__(self): - HashBlobCreator.__init__(self) - # TODO: use StringIO - self.data_buffer = '' - - def _close(self): - return defer.succeed(True) - - def _write(self, data): - self.data_buffer += data diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index fffc44c9a..02d9a1693 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -2,7 +2,7 @@ import argparse import logging import sys -import tempfile +import os from twisted.internet import defer from twisted.internet import reactor @@ -31,13 +31,14 @@ def main(args=None): parser.add_argument('--timeout', type=int, default=30) parser.add_argument('peer') parser.add_argument('blob_hash') + parser.add_argument('directory', type=str, default=os.getcwd()) args = parser.parse_args(args) log_support.configure_console(level='DEBUG') announcer = HashAnnouncer.DummyHashAnnouncer() blob_manager = MyBlobManager(announcer) - blob = HashBlob.TempBlob(args.blob_hash, False) + blob = HashBlob.BlobFile(args.directory, args.blob_hash) download_manager = SingleBlobDownloadManager(blob) peer = Peer.Peer(*conf.server_port(args.peer)) payment_rate_manager = DumbPaymentRateManager() diff --git a/tests/unit/cryptstream/test_cryptblob.py b/tests/unit/cryptstream/test_cryptblob.py index cb1110d77..624f1a747 100644 --- a/tests/unit/cryptstream/test_cryptblob.py +++ b/tests/unit/cryptstream/test_cryptblob.py @@ -1,7 +1,6 @@ from twisted.trial import unittest from twisted.internet import defer from lbrynet.cryptstream import CryptBlob -from lbrynet.core.HashBlob import TempBlobCreator from lbrynet import conf from tests.mocks import mock_conf_settings