diff --git a/CHANGELOG.md b/CHANGELOG.md index 948c71531..0e645f9b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ at anytime. * ### Fixed - * - * + * Fixed handling cancelled blob and availability requests + * Fixed redundant blob requests to a peer ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. @@ -25,12 +25,19 @@ at anytime. * ### Added - * - * + * Added WAL pragma to sqlite3 + * Added unit tests for `BlobFile` + * Updated exchange rate tests for the lbry.io api + * Use `hashlib` for sha384 instead of `pycrypto` + * Use `cryptography` instead of `pycrypto` for blob encryption and decryption + * Use `cryptography` for PKCS7 instead of doing it manually + * Use `BytesIO` buffers instead of temp files when processing blobs + * Refactored and pruned blob related classes into `lbrynet.blobs` + * Changed several `assert`s to raise more useful errors ### Removed - * - * + * Removed `TempBlobFile` + * Removed unused `EncryptedFileOpener` ## [0.16.1] - 2017-09-20 diff --git a/lbrynet/blob/__init__.py b/lbrynet/blob/__init__.py new file mode 100644 index 000000000..e605ea317 --- /dev/null +++ b/lbrynet/blob/__init__.py @@ -0,0 +1,4 @@ +from blob_file import BlobFile +from creator import BlobFileCreator +from writer import HashBlobWriter +from reader import HashBlobReader diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py new file mode 100644 index 000000000..f1a2010ee --- /dev/null +++ b/lbrynet/blob/blob_file.py @@ -0,0 +1,227 @@ +import logging +import os +import threading +from twisted.internet import defer, threads +from twisted.protocols.basic import FileSender +from twisted.web.client import FileBodyProducer +from twisted.python.failure import Failure +from lbrynet import conf +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError +from lbrynet.core.utils import is_valid_blobhash +from lbrynet.blob.writer import HashBlobWriter +from lbrynet.blob.reader import HashBlobReader + + +log = logging.getLogger(__name__) + + +class BlobFile(object): + """ + A chunk of data available on the network which is specified by a hashsum + + This class is used to create blobs on the local filesystem + when we already know the blob hash before hand (i.e., when downloading blobs) + Also can be used for reading from blobs on the local filesystem + """ + + def __str__(self): + return self.blob_hash[:16] + + def __repr__(self): + return '<{}({})>'.format(self.__class__.__name__, str(self)) + + def __init__(self, blob_dir, blob_hash, length=None): + if not is_valid_blobhash(blob_hash): + raise InvalidBlobHashError(blob_hash) + self.blob_hash = blob_hash + self.length = length + self.writers = {} # {Peer: writer, finished_deferred} + self._verified = False + self.readers = 0 + self.blob_dir = blob_dir + self.file_path = os.path.join(blob_dir, self.blob_hash) + self.setting_verified_blob_lock = threading.Lock() + self.moved_verified_blob = False + if os.path.isfile(self.file_path): + self.set_length(os.path.getsize(self.file_path)) + # This assumes that the hash of the blob has already been + # checked as part of the blob creation process. It might + # be worth having a function that checks the actual hash; + # its probably too expensive to have that check be part of + # this call. + self._verified = True + + def open_for_writing(self, peer): + """ + open a blob file to be written by peer, supports concurrent + writers, as long as they are from differnt peers. + + returns tuple of (writer, finished_deferred) + + writer - a file like object with a write() function, close() when finished + finished_deferred - deferred that is fired when write is finished and returns + a instance of itself as HashBlob + """ + if not peer in self.writers: + log.debug("Opening %s to be written by %s", str(self), str(peer)) + finished_deferred = defer.Deferred() + writer = HashBlobWriter(self.get_length, self.writer_finished) + self.writers[peer] = (writer, finished_deferred) + return (writer, finished_deferred) + log.warning("Tried to download the same file twice simultaneously from the same peer") + return None, None + + def open_for_reading(self): + """ + open blob for reading + + returns a file handle that can be read() from. + once finished with the file handle, user must call close_read_handle() + otherwise blob cannot be deleted. + """ + if self._verified is True: + file_handle = None + try: + file_handle = open(self.file_path, 'rb') + self.readers += 1 + return file_handle + except IOError: + log.exception('Failed to open %s', self.file_path) + self.close_read_handle(file_handle) + return None + + def delete(self): + """ + delete blob file from file system, prevent deletion + if a blob is being read from or written to + + returns a deferred that firesback when delete is completed + """ + if not self.writers and not self.readers: + self._verified = False + self.moved_verified_blob = False + + def delete_from_file_system(): + if os.path.isfile(self.file_path): + os.remove(self.file_path) + + d = threads.deferToThread(delete_from_file_system) + + def log_error(err): + log.warning("An error occurred deleting %s: %s", + str(self.file_path), err.getErrorMessage()) + return err + + d.addErrback(log_error) + return d + else: + return defer.fail(Failure( + ValueError("File is currently being read or written and cannot be deleted"))) + + @property + def verified(self): + """ + Protect verified from being modified by other classes. + verified is True if a write to a blob has completed succesfully, + or a blob has been read to have the same length as specified + in init + """ + return self._verified + + def set_length(self, length): + if self.length is not None and length == self.length: + return True + if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']: + self.length = length + return True + log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", + self.length, length) + return False + + def get_length(self): + return self.length + + def get_is_verified(self): + return self.verified + + def is_downloading(self): + if self.writers: + return True + return False + + def read(self, write_func): + def close_self(*args): + self.close_read_handle(file_handle) + return args[0] + + file_sender = FileSender() + reader = HashBlobReader(write_func) + file_handle = self.open_for_reading() + if file_handle is not None: + d = file_sender.beginFileTransfer(file_handle, reader) + d.addCallback(close_self) + else: + d = defer.fail(IOError("Could not read the blob")) + return d + + def writer_finished(self, writer, err=None): + def fire_finished_deferred(): + self._verified = True + for p, (w, finished_deferred) in self.writers.items(): + if w == writer: + del self.writers[p] + finished_deferred.callback(self) + return True + log.warning( + "Somehow, the writer that was accepted as being valid was already removed: %s", + writer) + return False + + def errback_finished_deferred(err): + for p, (w, finished_deferred) in self.writers.items(): + if w == writer: + del self.writers[p] + finished_deferred.errback(err) + + def cancel_other_downloads(): + for p, (w, finished_deferred) in self.writers.items(): + w.close() + + if err is None: + if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: + if self._verified is False: + d = self._save_verified_blob(writer) + d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) + d.addCallback(lambda _: cancel_other_downloads()) + else: + errback_finished_deferred(Failure(DownloadCanceledError())) + d = defer.succeed(True) + else: + err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}" + err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash, + writer.blob_hash) + errback_finished_deferred(Failure(InvalidDataError(err_string))) + d = defer.succeed(True) + else: + errback_finished_deferred(err) + d = defer.succeed(True) + d.addBoth(lambda _: writer.close_handle()) + return d + + def close_read_handle(self, file_handle): + if file_handle is not None: + file_handle.close() + self.readers -= 1 + + @defer.inlineCallbacks + def _save_verified_blob(self, writer): + with self.setting_verified_blob_lock: + if self.moved_verified_blob is False: + writer.write_handle.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(writer.write_handle) + yield producer.startProducing(open(out_path, 'wb')) + self.moved_verified_blob = True + defer.returnValue(True) + else: + raise DownloadCanceledError() diff --git a/lbrynet/blob/creator.py b/lbrynet/blob/creator.py new file mode 100644 index 000000000..963986d5c --- /dev/null +++ b/lbrynet/blob/creator.py @@ -0,0 +1,51 @@ +import os +import logging +from io import BytesIO +from twisted.internet import defer +from twisted.web.client import FileBodyProducer +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class BlobFileCreator(object): + """ + This class is used to create blobs on the local filesystem + when we do not know the blob hash beforehand (i.e, when creating + a new stream) + """ + def __init__(self, blob_dir): + self.blob_dir = blob_dir + self.buffer = BytesIO() + self._is_open = True + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + self.blob_hash = None + self.length = None + + @defer.inlineCallbacks + def close(self): + self.length = self.len_so_far + self.blob_hash = self._hashsum.hexdigest() + if self.blob_hash and self._is_open and self.length > 0: + # do not save 0 length files (empty tail blob in streams) + # or if its been closed already + self.buffer.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(self.buffer) + yield producer.startProducing(open(out_path, 'wb')) + self._is_open = False + if self.length > 0: + defer.returnValue(self.blob_hash) + else: + # 0 length files (empty tail blob in streams ) + # must return None as their blob_hash for + # it to be saved properly by EncryptedFileMetadataManagers + defer.returnValue(None) + + def write(self, data): + if not self._is_open: + raise IOError + self._hashsum.update(data) + self.len_so_far += len(data) + self.buffer.write(data) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py new file mode 100644 index 000000000..c85cc38f3 --- /dev/null +++ b/lbrynet/blob/reader.py @@ -0,0 +1,30 @@ +import logging +from twisted.internet import interfaces +from zope.interface import implements + +log = logging.getLogger(__name__) + + +class HashBlobReader(object): + implements(interfaces.IConsumer) + + def __init__(self, write_func): + self.write_func = write_func + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + pass + + def write(self, data): + from twisted.internet import reactor + + self.write_func(data) + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py new file mode 100644 index 000000000..a95430386 --- /dev/null +++ b/lbrynet/blob/writer.py @@ -0,0 +1,53 @@ +import logging +from io import BytesIO +from twisted.python.failure import Failure +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class HashBlobWriter(object): + def __init__(self, length_getter, finished_cb): + self.write_handle = BytesIO() + self.length_getter = length_getter + self.finished_cb = finished_cb + self.finished_cb_d = None + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + + @property + def blob_hash(self): + return self._hashsum.hexdigest() + + def write(self, data): + if self.write_handle is None: + log.exception("writer has already been closed") + raise IOError('I/O operation on closed file') + + self._hashsum.update(data) + self.len_so_far += len(data) + if self.len_so_far > self.length_getter(): + self.finished_cb_d = self.finished_cb( + self, + Failure(InvalidDataError("Length so far is greater than the expected length." + " %s to %s" % (self.len_so_far, + self.length_getter())))) + else: + self.write_handle.write(data) + if self.len_so_far == self.length_getter(): + self.finished_cb_d = self.finished_cb(self) + + def close_handle(self): + if self.write_handle is not None: + self.write_handle.close() + self.write_handle = None + + def close(self, reason=None): + # if we've already called finished_cb because we either finished writing + # or closed already, do nothing + if self.finished_cb_d is not None: + return + if reason is None: + reason = Failure(DownloadCanceledError()) + self.finished_cb_d = self.finished_cb(self, reason) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index b5407604c..e3b41d3cf 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -6,7 +6,8 @@ import sqlite3 from twisted.internet import threads, defer, reactor from twisted.enterprise import adbapi from lbrynet import conf -from lbrynet.core.HashBlob import BlobFile, BlobFileCreator +from lbrynet.blob.blob_file import BlobFile +from lbrynet.blob.creator import BlobFileCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -29,7 +30,6 @@ class DiskBlobManager(DHTHashSupplier): self.blob_dir = blob_dir self.db_file = os.path.join(db_dir, "blobs.db") self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) - self.blob_type = BlobFile self.blob_creator_type = BlobFileCreator # TODO: consider using an LRU for blobs as there could potentially # be thousands of blobs loaded up, many stale @@ -51,7 +51,8 @@ class DiskBlobManager(DHTHashSupplier): """Return a blob identified by blob_hash, which may be a new blob or a blob that is already on the hard disk """ - assert length is None or isinstance(length, int) + if length is not None and not isinstance(length, int): + raise Exception("invalid length type: %s (%s)", length, str(type(length))) if blob_hash in self.blobs: return defer.succeed(self.blobs[blob_hash]) return self._make_new_blob(blob_hash, length) @@ -61,7 +62,7 @@ class DiskBlobManager(DHTHashSupplier): def _make_new_blob(self, blob_hash, length=None): log.debug('Making a new blob for %s', blob_hash) - blob = self.blob_type(self.blob_dir, blob_hash, length) + blob = BlobFile(self.blob_dir, blob_hash, length) self.blobs[blob_hash] = blob return defer.succeed(blob) @@ -89,10 +90,13 @@ class DiskBlobManager(DHTHashSupplier): def creator_finished(self, blob_creator, should_announce): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) - assert blob_creator.blob_hash is not None - assert blob_creator.blob_hash not in self.blobs - assert blob_creator.length is not None - new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length) + if blob_creator.blob_hash is None: + raise Exception("Blob hash is None") + if blob_creator.blob_hash in self.blobs: + raise Exception("Creator finished for blob that is already marked as completed") + if blob_creator.length is None: + raise Exception("Blob has a length of 0") + new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob next_announce_time = self.get_next_announce_time() d = self.blob_completed(new_blob, next_announce_time, should_announce) @@ -137,6 +141,7 @@ class DiskBlobManager(DHTHashSupplier): # threads. def create_tables(transaction): + transaction.execute('PRAGMA journal_mode=WAL') transaction.execute("create table if not exists blobs (" + " blob_hash text primary key, " + " blob_length integer, " + @@ -255,5 +260,3 @@ class DiskBlobManager(DHTHashSupplier): "insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) return d - - diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py deleted file mode 100644 index d80cede96..000000000 --- a/lbrynet/core/HashBlob.py +++ /dev/null @@ -1,413 +0,0 @@ -from StringIO import StringIO -import logging -import os -import tempfile -import threading -import shutil -from twisted.internet import interfaces, defer, threads -from twisted.protocols.basic import FileSender -from twisted.python.failure import Failure -from zope.interface import implements -from lbrynet import conf -from lbrynet.core.Error import DownloadCanceledError, InvalidDataError -from lbrynet.core.cryptoutils import get_lbry_hash_obj -from lbrynet.core.utils import is_valid_blobhash - - -log = logging.getLogger(__name__) - - -class HashBlobReader(object): - implements(interfaces.IConsumer) - - def __init__(self, write_func): - self.write_func = write_func - - def registerProducer(self, producer, streaming): - - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - pass - - def write(self, data): - - from twisted.internet import reactor - - self.write_func(data) - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - -class HashBlobWriter(object): - def __init__(self, write_handle, length_getter, finished_cb): - self.write_handle = write_handle - self.length_getter = length_getter - self.finished_cb = finished_cb - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - - @property - def blob_hash(self): - return self._hashsum.hexdigest() - - def write(self, data): - self._hashsum.update(data) - self.len_so_far += len(data) - if self.len_so_far > self.length_getter(): - self.finished_cb( - self, - Failure(InvalidDataError("Length so far is greater than the expected length." - " %s to %s" % (self.len_so_far, - self.length_getter())))) - else: - if self.write_handle is None: - log.debug("Tried to write to a write_handle that was None.") - return - self.write_handle.write(data) - if self.len_so_far == self.length_getter(): - self.finished_cb(self) - - def cancel(self, reason=None): - if reason is None: - reason = Failure(DownloadCanceledError()) - self.finished_cb(self, reason) - - -class HashBlob(object): - """A chunk of data available on the network which is specified by a hashsum""" - - def __init__(self, blob_hash, length=None): - assert is_valid_blobhash(blob_hash) - self.blob_hash = blob_hash - self.length = length - self.writers = {} # {Peer: writer, finished_deferred} - self.finished_deferred = None - self._verified = False - self.readers = 0 - - @property - def verified(self): - # protect verified from being modified by other classes - return self._verified - - def set_length(self, length): - if self.length is not None and length == self.length: - return True - if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']: - self.length = length - return True - log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", - self.length, length) - return False - - def get_length(self): - return self.length - - def is_validated(self): - return bool(self._verified) - - def is_downloading(self): - if self.writers: - return True - return False - - def read(self, write_func): - - def close_self(*args): - self.close_read_handle(file_handle) - return args[0] - - file_sender = FileSender() - reader = HashBlobReader(write_func) - file_handle = self.open_for_reading() - if file_handle is not None: - d = file_sender.beginFileTransfer(file_handle, reader) - d.addCallback(close_self) - else: - d = defer.fail(ValueError("Could not read the blob")) - return d - - def writer_finished(self, writer, err=None): - - def fire_finished_deferred(): - self._verified = True - for p, (w, finished_deferred) in self.writers.items(): - if w == writer: - finished_deferred.callback(self) - del self.writers[p] - return True - log.warning( - "Somehow, the writer that was accepted as being valid was already removed: %s", - writer) - return False - - def errback_finished_deferred(err): - for p, (w, finished_deferred) in self.writers.items(): - if w == writer: - finished_deferred.errback(err) - del self.writers[p] - - def cancel_other_downloads(): - for p, (w, finished_deferred) in self.writers.items(): - w.cancel() - - if err is None: - if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: - if self._verified is False: - d = self._save_verified_blob(writer) - d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) - d.addCallback(lambda _: cancel_other_downloads()) - else: - errback_finished_deferred(Failure(DownloadCanceledError())) - d = defer.succeed(True) - else: - err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}" - err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash, - writer.blob_hash) - errback_finished_deferred(Failure(InvalidDataError(err_string))) - d = defer.succeed(True) - else: - errback_finished_deferred(err) - d = defer.succeed(True) - - d.addBoth(lambda _: self._close_writer(writer)) - return d - - def open_for_writing(self, peer): - pass - - def open_for_reading(self): - pass - - def delete(self): - pass - - def close_read_handle(self, file_handle): - pass - - def _close_writer(self, writer): - pass - - def _save_verified_blob(self, writer): - pass - - def __str__(self): - return self.blob_hash[:16] - - def __repr__(self): - return '<{}({})>'.format(self.__class__.__name__, str(self)) - - -class BlobFile(HashBlob): - """A HashBlob which will be saved to the hard disk of the downloader""" - - def __init__(self, blob_dir, *args): - HashBlob.__init__(self, *args) - self.blob_dir = blob_dir - self.file_path = os.path.join(blob_dir, self.blob_hash) - self.setting_verified_blob_lock = threading.Lock() - self.moved_verified_blob = False - if os.path.isfile(self.file_path): - self.set_length(os.path.getsize(self.file_path)) - # This assumes that the hash of the blob has already been - # checked as part of the blob creation process. It might - # be worth having a function that checks the actual hash; - # its probably too expensive to have that check be part of - # this call. - self._verified = True - - def open_for_writing(self, peer): - if not peer in self.writers: - log.debug("Opening %s to be written by %s", str(self), str(peer)) - write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) - finished_deferred = defer.Deferred() - writer = HashBlobWriter(write_file, self.get_length, self.writer_finished) - - self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.cancel - log.warning("Tried to download the same file twice simultaneously from the same peer") - return None, None, None - - def open_for_reading(self): - if self._verified is True: - file_handle = None - try: - file_handle = open(self.file_path, 'rb') - self.readers += 1 - return file_handle - except IOError: - log.exception('Failed to open %s', self.file_path) - self.close_read_handle(file_handle) - return None - - def delete(self): - if not self.writers and not self.readers: - self._verified = False - self.moved_verified_blob = False - - def delete_from_file_system(): - if os.path.isfile(self.file_path): - os.remove(self.file_path) - - d = threads.deferToThread(delete_from_file_system) - - def log_error(err): - log.warning("An error occurred deleting %s: %s", - str(self.file_path), err.getErrorMessage()) - return err - - d.addErrback(log_error) - return d - else: - return defer.fail(Failure( - ValueError("File is currently being read or written and cannot be deleted"))) - - def close_read_handle(self, file_handle): - if file_handle is not None: - file_handle.close() - self.readers -= 1 - - def _close_writer(self, writer): - if writer.write_handle is not None: - log.debug("Closing %s", str(self)) - name = writer.write_handle.name - writer.write_handle.close() - threads.deferToThread(os.remove, name) - writer.write_handle = None - - def _save_verified_blob(self, writer): - - def move_file(): - with self.setting_verified_blob_lock: - if self.moved_verified_blob is False: - temp_file_name = writer.write_handle.name - writer.write_handle.close() - shutil.move(temp_file_name, self.file_path) - writer.write_handle = None - self.moved_verified_blob = True - return True - else: - raise DownloadCanceledError() - - return threads.deferToThread(move_file) - - -class TempBlob(HashBlob): - """A HashBlob which will only exist in memory""" - def __init__(self, *args): - HashBlob.__init__(self, *args) - self.data_buffer = "" - - def open_for_writing(self, peer): - if not peer in self.writers: - temp_buffer = StringIO() - finished_deferred = defer.Deferred() - writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished) - - self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.cancel - return None, None, None - - def open_for_reading(self): - if self._verified is True: - return StringIO(self.data_buffer) - return None - - def delete(self): - if not self.writers and not self.readers: - self._verified = False - self.data_buffer = '' - return defer.succeed(True) - else: - return defer.fail(Failure( - ValueError("Blob is currently being read or written and cannot be deleted"))) - - def close_read_handle(self, file_handle): - file_handle.close() - - def _close_writer(self, writer): - if writer.write_handle is not None: - writer.write_handle.close() - writer.write_handle = None - - def _save_verified_blob(self, writer): - if not self.data_buffer: - self.data_buffer = writer.write_handle.getvalue() - writer.write_handle.close() - writer.write_handle = None - return defer.succeed(True) - else: - return defer.fail(Failure(DownloadCanceledError())) - - -class HashBlobCreator(object): - def __init__(self): - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - self.blob_hash = None - self.length = None - - def open(self): - pass - - def close(self): - self.length = self.len_so_far - if self.length == 0: - self.blob_hash = None - else: - self.blob_hash = self._hashsum.hexdigest() - d = self._close() - if self.blob_hash is not None: - d.addCallback(lambda _: self.blob_hash) - else: - d.addCallback(lambda _: None) - return d - - def write(self, data): - self._hashsum.update(data) - self.len_so_far += len(data) - self._write(data) - - def _close(self): - pass - - def _write(self, data): - pass - - -class BlobFileCreator(HashBlobCreator): - def __init__(self, blob_dir): - HashBlobCreator.__init__(self) - self.blob_dir = blob_dir - self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) - - def _close(self): - temp_file_name = self.out_file.name - self.out_file.close() - if self.blob_hash is not None: - shutil.move(temp_file_name, os.path.join(self.blob_dir, self.blob_hash)) - else: - os.remove(temp_file_name) - return defer.succeed(True) - - def _write(self, data): - self.out_file.write(data) - - -class TempBlobCreator(HashBlobCreator): - def __init__(self): - HashBlobCreator.__init__(self) - # TODO: use StringIO - self.data_buffer = '' - - def _close(self): - return defer.succeed(True) - - def _write(self, data): - self.data_buffer += data diff --git a/lbrynet/core/StreamCreator.py b/lbrynet/core/StreamCreator.py deleted file mode 100644 index 4aa0ae542..000000000 --- a/lbrynet/core/StreamCreator.py +++ /dev/null @@ -1,76 +0,0 @@ -import logging -from twisted.internet import interfaces, defer -from zope.interface import implements - - -log = logging.getLogger(__name__) - - -class StreamCreator(object): - """Classes which derive from this class create a 'stream', which can be any - collection of associated blobs and associated metadata. These classes - use the IConsumer interface to get data from an IProducer and transform - the data into a 'stream'""" - - implements(interfaces.IConsumer) - - def __init__(self, name): - """ - @param name: the name of the stream - """ - self.name = name - self.stopped = True - self.producer = None - self.streaming = None - self.blob_count = -1 - self.current_blob = None - self.finished_deferreds = [] - - def _blob_finished(self, blob_info): - pass - - def registerProducer(self, producer, streaming): - - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - self.stopped = False - if streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.stopped = True - self.producer = None - - def stop(self): - """Stop creating the stream. Create the terminating zero-length blob.""" - log.debug("stop has been called for StreamCreator") - self.stopped = True - if self.current_blob is not None: - current_blob = self.current_blob - d = current_blob.close() - d.addCallback(self._blob_finished) - self.finished_deferreds.append(d) - self.current_blob = None - self._finalize() - dl = defer.DeferredList(self.finished_deferreds) - dl.addCallback(lambda _: self._finished()) - return dl - - def _finalize(self): - pass - - def _finished(self): - pass - - # TODO: move the stream creation process to its own thread and - # remove the reactor from this process. - def write(self, data): - from twisted.internet import reactor - self._write(data) - if self.stopped is False and self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def _write(self, data): - pass diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 95ffaa327..1ce4f7205 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -4,6 +4,7 @@ from decimal import Decimal from twisted.internet import defer from twisted.python.failure import Failure +from twisted.internet.error import ConnectionAborted from zope.interface import implements from lbrynet.core.Error import ConnectionClosedBeforeResponseError @@ -225,7 +226,8 @@ class RequestHelper(object): self.requestor._update_local_score(self.peer, score) def _request_failed(self, reason, request_type): - if reason.check(RequestCanceledError): + if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted, + ConnectionClosedBeforeResponseError): return if reason.check(NoResponseError): self.requestor._incompatible_peers.append(self.peer) @@ -463,13 +465,13 @@ class DownloadRequest(RequestHelper): def find_blob(self, to_download): """Return the first blob in `to_download` that is successfully opened for write.""" for blob in to_download: - if blob.is_validated(): + if blob.get_is_verified(): log.debug('Skipping blob %s as its already validated', blob) continue - d, write_func, cancel_func = blob.open_for_writing(self.peer) + writer, d = blob.open_for_writing(self.peer) if d is not None: - return BlobDownloadDetails(blob, d, write_func, cancel_func, self.peer) - log.debug('Skipping blob %s as there was an issue opening it for writing', blob) + return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer) + log.warning('Skipping blob %s as there was an issue opening it for writing', blob) return None def _make_request(self, blob_details): diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index a9dca8307..b8861a6ab 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -50,6 +50,7 @@ class ClientProtocol(Protocol, TimeoutMixin): log.debug("Data receieved from %s", self.peer) self.setTimeout(None) self._rate_limiter.report_dl_bytes(len(data)) + if self._downloading_blob is True: self._blob_download_request.write(data) else: @@ -101,8 +102,7 @@ class ClientProtocol(Protocol, TimeoutMixin): d = self.add_request(blob_request) self._blob_download_request = blob_request blob_request.finished_deferred.addCallbacks(self._downloading_finished, - self._downloading_failed) - blob_request.finished_deferred.addErrback(self._handle_response_error) + self._handle_response_error) return d else: raise ValueError("There is already a blob download request active") @@ -110,7 +110,7 @@ class ClientProtocol(Protocol, TimeoutMixin): def cancel_requests(self): self.connection_closing = True ds = [] - err = failure.Failure(RequestCanceledError()) + err = RequestCanceledError() for key, d in self._response_deferreds.items(): del self._response_deferreds[key] d.errback(err) @@ -119,6 +119,7 @@ class ClientProtocol(Protocol, TimeoutMixin): self._blob_download_request.cancel(err) ds.append(self._blob_download_request.finished_deferred) self._blob_download_request = None + self._downloading_blob = False return defer.DeferredList(ds) ######### Internal request handling ######### @@ -176,15 +177,24 @@ class ClientProtocol(Protocol, TimeoutMixin): def _handle_response_error(self, err): # If an error gets to this point, log it and kill the connection. - expected_errors = (MisbehavingPeerError, ConnectionClosedBeforeResponseError, - DownloadCanceledError, RequestCanceledError) - if not err.check(expected_errors): + if err.check(DownloadCanceledError, RequestCanceledError, error.ConnectionAborted): + # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't + # TODO: always be this way. it's done this way now because the client has no other way + # TODO: of telling the server it wants the download to stop. It would be great if the + # TODO: protocol had such a mechanism. + log.info("Closing the connection to %s because the download of blob %s was canceled", + self.peer, self._blob_download_request.blob) + result = None + elif not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError): + log.warning("The connection to %s is closing due to: %s", self.peer, err) + result = err + else: log.error("The connection to %s is closing due to an unexpected error: %s", - self.peer, err.getErrorMessage()) - if not err.check(RequestCanceledError): - # The connection manager is closing the connection, so - # there's no need to do it here. - return err + self.peer, err) + result = err + + self.transport.loseConnection() + return result def _handle_response(self, response): ds = [] @@ -225,7 +235,7 @@ class ClientProtocol(Protocol, TimeoutMixin): log.debug("Asking for another request from %s", self.peer) self._ask_for_request() else: - log.debug("Not asking for another request from %s", self.peer) + log.warning("Not asking for another request from %s", self.peer) self.transport.loseConnection() dl.addCallback(get_next_request) @@ -236,16 +246,6 @@ class ClientProtocol(Protocol, TimeoutMixin): self._downloading_blob = False return arg - def _downloading_failed(self, err): - if err.check(DownloadCanceledError): - # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't - # TODO: always be this way. it's done this way now because the client has no other way - # TODO: of telling the server it wants the download to stop. It would be great if the - # TODO: protocol had such a mechanism. - log.debug("Closing the connection to %s because the download of blob %s was canceled", - self.peer, self._blob_download_request.blob) - return err - ######### IRateLimited ######### def throttle_upload(self): diff --git a/lbrynet/core/client/StandaloneBlobDownloader.py b/lbrynet/core/client/StandaloneBlobDownloader.py index 97dc4727f..f7a108c65 100644 --- a/lbrynet/core/client/StandaloneBlobDownloader.py +++ b/lbrynet/core/client/StandaloneBlobDownloader.py @@ -64,14 +64,14 @@ class SingleProgressManager(object): def stream_position(self): blobs = self.download_manager.blobs - if blobs and blobs[0].is_validated(): + if blobs and blobs[0].get_is_verified(): return 1 return 0 def needed_blobs(self): blobs = self.download_manager.blobs assert len(blobs) == 1 - return [b for b in blobs.itervalues() if not b.is_validated()] + return [b for b in blobs.itervalues() if not b.get_is_verified()] class DummyBlobHandler(object): diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index 29aea9d1a..bc16fe560 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -93,7 +93,7 @@ class FullStreamProgressManager(StreamProgressManager): return ( i not in blobs or ( - not blobs[i].is_validated() and + not blobs[i].get_is_verified() and i not in self.provided_blob_nums ) ) @@ -112,7 +112,7 @@ class FullStreamProgressManager(StreamProgressManager): blobs = self.download_manager.blobs return [ b for n, b in blobs.iteritems() - if not b.is_validated() and not n in self.provided_blob_nums + if not b.get_is_verified() and not n in self.provided_blob_nums ] ######### internal ######### @@ -145,7 +145,7 @@ class FullStreamProgressManager(StreamProgressManager): current_blob_num = self.last_blob_outputted + 1 - if current_blob_num in blobs and blobs[current_blob_num].is_validated(): + if current_blob_num in blobs and blobs[current_blob_num].get_is_verified(): log.debug("Outputting blob %s", str(self.last_blob_outputted + 1)) self.provided_blob_nums.append(self.last_blob_outputted + 1) d = self.download_manager.handle_blob(self.last_blob_outputted + 1) @@ -154,10 +154,11 @@ class FullStreamProgressManager(StreamProgressManager): d.addCallback(lambda _: check_if_finished()) def log_error(err): - log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage()) + log.warning("Error occurred in the output loop. Error: %s", err) if self.outputting_d is not None and not self.outputting_d.called: self.outputting_d.callback(True) self.outputting_d = None + self.stop() d.addErrback(log_error) else: diff --git a/lbrynet/core/cryptoutils.py b/lbrynet/core/cryptoutils.py index 7c0c5c40c..2528c7e69 100644 --- a/lbrynet/core/cryptoutils.py +++ b/lbrynet/core/cryptoutils.py @@ -1,9 +1,9 @@ -from Crypto.Hash import SHA384 import seccure +import hashlib def get_lbry_hash_obj(): - return SHA384.new() + return hashlib.sha384() def get_pub_key(pass_phrase): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index b95b3ca84..308d0c822 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -143,7 +143,7 @@ class BlobRequestHandler(object): def open_blob_for_reading(self, blob, response): response_fields = {} d = defer.succeed(None) - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: self.currently_uploading = blob diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index 0b26d6f89..a7303a588 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -1,11 +1,16 @@ import binascii import logging -from Crypto.Cipher import AES +from twisted.internet import defer +from cryptography.hazmat.primitives.ciphers import Cipher, modes +from cryptography.hazmat.primitives.ciphers.algorithms import AES +from cryptography.hazmat.primitives.padding import PKCS7 +from cryptography.hazmat.backends import default_backend from lbrynet import conf from lbrynet.core.BlobInfo import BlobInfo log = logging.getLogger(__name__) +backend = default_backend() class CryptBlobInfo(BlobInfo): @@ -31,7 +36,9 @@ class StreamBlobDecryptor(object): self.length = length self.buff = b'' self.len_read = 0 - self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) + cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) + self.unpadder = PKCS7(AES.block_size).unpadder() + self.cipher = cipher.decryptor() def decrypt(self, write_func): """ @@ -42,22 +49,23 @@ class StreamBlobDecryptor(object): """ def remove_padding(data): - pad_len = ord(data[-1]) - data, padding = data[:-1 * pad_len], data[-1 * pad_len:] - for c in padding: - assert ord(c) == pad_len - return data + return self.unpadder.update(data) + self.unpadder.finalize() def write_bytes(): if self.len_read < self.length: - num_bytes_to_decrypt = greatest_multiple(len(self.buff), self.cipher.block_size) + num_bytes_to_decrypt = greatest_multiple(len(self.buff), (AES.block_size / 8)) data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt) - write_func(self.cipher.decrypt(data_to_decrypt)) + write_func(self.cipher.update(data_to_decrypt)) def finish_decrypt(): - assert len(self.buff) % self.cipher.block_size == 0 + bytes_left = len(self.buff) % (AES.block_size / 8) + if bytes_left != 0: + log.warning(self.buff[-1 * (AES.block_size / 8):].encode('hex')) + raise Exception("blob %s has incorrect padding: %i bytes left" % + (self.blob.blob_hash, bytes_left)) data_to_decrypt, self.buff = self.buff, b'' - write_func(remove_padding(self.cipher.decrypt(data_to_decrypt))) + last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize() + write_func(remove_padding(last_chunk)) def decrypt_bytes(data): self.buff += data @@ -84,8 +92,9 @@ class CryptStreamBlobMaker(object): self.iv = iv self.blob_num = blob_num self.blob = blob - self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) - self.buff = b'' + cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) + self.padder = PKCS7(AES.block_size).padder() + self.cipher = cipher.encryptor() self.length = 0 def write(self, data): @@ -104,39 +113,26 @@ class CryptStreamBlobMaker(object): done = True else: num_bytes_to_write = len(data) - self.length += num_bytes_to_write data_to_write = data[:num_bytes_to_write] - self.buff += data_to_write - self._write_buffer() + self.length += len(data_to_write) + padded_data = self.padder.update(data_to_write) + encrypted_data = self.cipher.update(padded_data) + self.blob.write(encrypted_data) return done, num_bytes_to_write + @defer.inlineCallbacks def close(self): log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length)) if self.length != 0: - self._close_buffer() - d = self.blob.close() - d.addCallback(self._return_info) + self.length += (AES.block_size / 8) - (self.length % (AES.block_size / 8)) + padded_data = self.padder.finalize() + encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize() + self.blob.write(encrypted_data) + + blob_hash = yield self.blob.close() log.debug("called the finished_callback from CryptStreamBlobMaker.close") - return d - - def _write_buffer(self): - num_bytes_to_encrypt = (len(self.buff) // AES.block_size) * AES.block_size - data_to_encrypt, self.buff = split(self.buff, num_bytes_to_encrypt) - encrypted_data = self.cipher.encrypt(data_to_encrypt) - self.blob.write(encrypted_data) - - def _close_buffer(self): - data_to_encrypt, self.buff = self.buff, b'' - assert len(data_to_encrypt) < AES.block_size - pad_len = AES.block_size - len(data_to_encrypt) - padded_data = data_to_encrypt + chr(pad_len) * pad_len - self.length += pad_len - assert len(padded_data) == AES.block_size - encrypted_data = self.cipher.encrypt(padded_data) - self.blob.write(encrypted_data) - - def _return_info(self, blob_hash): - return CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv)) + blob = CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv)) + defer.returnValue(blob) def greatest_multiple(a, b): diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index e5b3c8bf3..9c94ad476 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -3,25 +3,27 @@ Utility for creating Crypt Streams, which are encrypted blobs and associated met """ import logging - +from twisted.internet import interfaces, defer +from zope.interface import implements from Crypto import Random from Crypto.Cipher import AES - -from twisted.internet import defer -from lbrynet.core.StreamCreator import StreamCreator from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker log = logging.getLogger(__name__) -class CryptStreamCreator(StreamCreator): - """Create a new stream with blobs encrypted by a symmetric cipher. +class CryptStreamCreator(object): + """ + Create a new stream with blobs encrypted by a symmetric cipher. Each blob is encrypted with the same key, but each blob has its own initialization vector which is associated with the blob when the blob is associated with the stream. """ + + implements(interfaces.IConsumer) + def __init__(self, blob_manager, name=None, key=None, iv_generator=None): """@param blob_manager: Object that stores and provides access to blobs. @type blob_manager: BlobManager @@ -39,14 +41,59 @@ class CryptStreamCreator(StreamCreator): @return: None """ - StreamCreator.__init__(self, name) self.blob_manager = blob_manager + self.name = name self.key = key if iv_generator is None: self.iv_generator = self.random_iv_generator() else: self.iv_generator = iv_generator + self.stopped = True + self.producer = None + self.streaming = None + self.blob_count = -1 + self.current_blob = None + self.finished_deferreds = [] + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + self.stopped = False + if streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + self.stopped = True + self.producer = None + + def stop(self): + """Stop creating the stream. Create the terminating zero-length blob.""" + log.debug("stop has been called for StreamCreator") + self.stopped = True + if self.current_blob is not None: + current_blob = self.current_blob + d = current_blob.close() + d.addCallback(self._blob_finished) + d.addErrback(self._error) + self.finished_deferreds.append(d) + self.current_blob = None + self._finalize() + dl = defer.DeferredList(self.finished_deferreds) + dl.addCallback(lambda _: self._finished()) + dl.addErrback(self._error) + return dl + + # TODO: move the stream creation process to its own thread and + # remove the reactor from this process. + def write(self, data): + from twisted.internet import reactor + self._write(data) + if self.stopped is False and self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + @staticmethod def random_iv_generator(): while 1: @@ -77,11 +124,6 @@ class CryptStreamCreator(StreamCreator): self.finished_deferreds.append(d) def _write(self, data): - def close_blob(blob): - d = blob.close() - d.addCallback(self._blob_finished) - self.finished_deferreds.append(d) - while len(data) > 0: if self.current_blob is None: self.next_blob_creator = self.blob_manager.get_blob_creator() @@ -94,10 +136,19 @@ class CryptStreamCreator(StreamCreator): should_announce = self.blob_count == 0 d = self.current_blob.close() d.addCallback(self._blob_finished) - d.addCallback(lambda _: self.blob_manager.creator_finished( - self.next_blob_creator, should_announce)) + d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info, + should_announce)) self.finished_deferreds.append(d) self.current_blob = None def _get_blob_maker(self, iv, blob_creator): return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator) + + def _error(self, error): + log.error(error) + + def _finished(self): + raise NotImplementedError() + + def _blob_finished(self, blob_info): + raise NotImplementedError() diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 8eb218fd3..5a45b78dd 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -29,7 +29,6 @@ from lbrynet.reflector import reupload from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.core.log_support import configure_loggly_handler from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory -from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType @@ -578,17 +577,8 @@ class Daemon(AuthJSONRPCServer): self.session.wallet, self.download_directory ) - self.sd_identifier.add_stream_downloader_factory( - EncryptedFileStreamType, file_saver_factory) - file_opener_factory = EncryptedFileOpenerFactory( - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.stream_info_manager, - self.session.wallet - ) - self.sd_identifier.add_stream_downloader_factory( - EncryptedFileStreamType, file_opener_factory) + self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, + file_saver_factory) return defer.succeed(None) def _download_blob(self, blob_hash, rate_manager=None, timeout=None): @@ -2467,10 +2457,11 @@ class Daemon(AuthJSONRPCServer): blob_hashes = [blob_hash] elif stream_hash: blobs = yield self.get_blobs_for_stream_hash(stream_hash) - blob_hashes = [blob.blob_hash for blob in blobs if blob.is_validated()] + blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()] elif sd_hash: blobs = yield self.get_blobs_for_sd_hash(sd_hash) - blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if blob.is_validated()] + blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if + blob.get_is_verified()] else: raise Exception('single argument must be specified') yield self.session.blob_manager._immediate_announce(blob_hashes) @@ -2573,9 +2564,9 @@ class Daemon(AuthJSONRPCServer): blobs = self.session.blob_manager.blobs.itervalues() if needed: - blobs = [blob for blob in blobs if not blob.is_validated()] + blobs = [blob for blob in blobs if not blob.get_is_verified()] if finished: - blobs = [blob for blob in blobs if blob.is_validated()] + blobs = [blob for blob in blobs if blob.get_is_verified()] blob_hashes = [blob.blob_hash for blob in blobs] page_size = page_size or len(blob_hashes) diff --git a/lbrynet/daemon/ExchangeRateManager.py b/lbrynet/daemon/ExchangeRateManager.py index 998d06d0d..805df2db1 100644 --- a/lbrynet/daemon/ExchangeRateManager.py +++ b/lbrynet/daemon/ExchangeRateManager.py @@ -64,9 +64,8 @@ class MarketFeed(object): self.rate = ExchangeRate(self.market, price, int(time.time())) def _log_error(self, err): - log.warning( - "There was a problem updating %s exchange rate information from %s", - self.market, self.name) + log.warning("There was a problem updating %s exchange rate information from %s\n%s", + self.market, self.name, err) def _update_price(self): d = threads.deferToThread(self._make_request) @@ -141,7 +140,10 @@ class LBRYioBTCFeed(MarketFeed): ) def _handle_response(self, response): - json_response = json.loads(response) + try: + json_response = json.loads(response) + except ValueError: + raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response) if 'data' not in json_response: raise InvalidExchangeRateResponse(self.name, 'result not found') return defer.succeed(1.0 / json_response['data']['btc_usd']) diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index 99d15e5e4..bf6d3bea7 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -29,8 +29,9 @@ class EncryptedFileStreamCreator(CryptStreamCreator): self.blob_infos = [] def _blob_finished(self, blob_info): - log.debug("length: %s", str(blob_info.length)) + log.debug("length: %s", blob_info.length) self.blob_infos.append(blob_info) + return blob_info def _save_stream_info(self): stream_info_manager = self.lbry_file_manager.stream_info_manager @@ -40,10 +41,6 @@ class EncryptedFileStreamCreator(CryptStreamCreator): self.blob_infos) return d - def setup(self): - d = CryptStreamCreator.setup(self) - return d - def _get_blobs_hashsum(self): blobs_hashsum = get_lbry_hash_obj() for blob_info in sorted(self.blob_infos, key=lambda b_i: b_i.blob_num): diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 212efea48..9d38548dd 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -303,8 +303,10 @@ class EncryptedFileManager(object): @rerun_if_locked def _change_file_status(self, rowid, new_status): - return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", + d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", (new_status, rowid)) + d.addCallback(lambda _: new_status) + return d @rerun_if_locked def _get_lbry_file_status(self, rowid): diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index e18c6f2cf..735c8027e 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -1,4 +1,3 @@ -import subprocess import binascii from zope.interface import implements @@ -10,8 +9,7 @@ from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler import os -from twisted.internet import defer, threads, reactor -from twisted.python.procutils import which +from twisted.internet import defer, threads import logging import traceback @@ -282,90 +280,3 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory): @staticmethod def get_description(): return "Save" - - -class EncryptedFileOpener(EncryptedFileDownloader): - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet): - EncryptedFileDownloader.__init__(self, stream_hash, - peer_finder, rate_limiter, - blob_manager, stream_info_manager, - payment_rate_manager, wallet, - ) - self.process = None - self.process_log = None - - def stop(self, err=None): - d = EncryptedFileDownloader.stop(self, err=err) - d.addCallback(lambda _: self._delete_from_info_manager()) - return d - - def _get_progress_manager(self, download_manager): - return FullStreamProgressManager(self._finished_downloading, - self.blob_manager, - download_manager) - - def _setup_output(self): - def start_process(): - if os.name == "nt": - paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe', - r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe'] - for p in paths: - if os.path.exists(p): - vlc_path = p - break - else: - raise ValueError("You must install VLC media player to stream files") - else: - vlc_path = 'vlc' - self.process_log = open("vlc.out", 'a') - try: - self.process = subprocess.Popen([vlc_path, '-'], stdin=subprocess.PIPE, - stdout=self.process_log, stderr=self.process_log) - except OSError: - raise ValueError("VLC media player could not be opened") - - d = threads.deferToThread(start_process) - return d - - def _close_output(self): - if self.process is not None: - self.process.stdin.close() - self.process = None - return defer.succeed(True) - - def _get_write_func(self): - def write_func(data): - if self.stopped is False and self.process is not None: - try: - self.process.stdin.write(data) - except IOError: - reactor.callLater(0, self.stop) - return write_func - - def _delete_from_info_manager(self): - return self.stream_info_manager.delete_stream(self.stream_hash) - - -class EncryptedFileOpenerFactory(EncryptedFileDownloaderFactory): - def can_download(self, sd_validator): - if which('vlc'): - return True - elif os.name == "nt": - paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe', - r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe'] - for p in paths: - if os.path.exists(p): - return True - return False - - def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): - return EncryptedFileOpener(stream_hash, self.peer_finder, - self.rate_limiter, self.blob_manager, - self.stream_info_manager, - payment_rate_manager, self.wallet, - ) - - @staticmethod - def get_description(): - return "Stream" diff --git a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py index bd09dfdfc..116ac7080 100644 --- a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py +++ b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py @@ -1,5 +1,6 @@ import logging from zope.interface import implements +from twisted.internet import defer from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.interfaces import IMetadataHandler @@ -18,10 +19,11 @@ class EncryptedFileMetadataHandler(object): ######### IMetadataHandler ######### + @defer.inlineCallbacks def get_initial_blobs(self): - d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - d.addCallback(self._format_initial_blobs_for_download_manager) - return d + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos) + defer.returnValue(formatted_infos) def final_blob_num(self): return self._final_blob_num @@ -30,10 +32,12 @@ class EncryptedFileMetadataHandler(object): def _format_initial_blobs_for_download_manager(self, blob_infos): infos = [] - for blob_hash, blob_num, iv, length in blob_infos: - if blob_hash is not None: + for i, (blob_hash, blob_num, iv, length) in enumerate(blob_infos): + if blob_hash is not None and length: infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv)) else: + if i != len(blob_infos) - 1: + raise Exception("Invalid stream terminator") log.debug("Setting _final_blob_num to %s", str(blob_num - 1)) self._final_blob_num = blob_num - 1 return infos diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index 854dc6489..1f1c540a2 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -132,7 +132,7 @@ class BlobReflectorClient(Protocol): return self.set_not_uploading() def open_blob_for_reading(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: log.debug('Getting ready to send %s', blob.blob_hash) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 26882d186..ebf605b02 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -112,11 +112,11 @@ class EncryptedFileReflectorClient(Protocol): def get_validated_blobs(self, blobs_in_stream): def get_blobs(blobs): for (blob, _, _, blob_len) in blobs: - if blob: + if blob and blob_len: yield self.blob_manager.get_blob(blob, blob_len) dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True) - dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()]) + dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()]) return dl def set_blobs_to_send(self, blobs_to_send): @@ -253,7 +253,7 @@ class EncryptedFileReflectorClient(Protocol): return self.set_not_uploading() def open_blob_for_reading(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: log.debug('Getting ready to send %s', blob.blob_hash) diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 8467e5321..7ca4b3cde 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -35,11 +35,11 @@ class ReflectorServer(Protocol): self.peer_version = None self.receiving_blob = False self.incoming_blob = None - self.blob_write = None self.blob_finished_d = None - self.cancel_write = None self.request_buff = "" + self.blob_writer = None + def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): log.info("Reflector upload from %s finished" % self.peer.host) @@ -82,14 +82,14 @@ class ReflectorServer(Protocol): """ blob = self.incoming_blob - self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer) self.blob_finished_d.addCallback(self._on_completed_blob, response_key) self.blob_finished_d.addErrback(self._on_failed_blob, response_key) def close_blob(self): + self.blob_writer.close() + self.blob_writer = None self.blob_finished_d = None - self.blob_write = None - self.cancel_write = None self.incoming_blob = None self.receiving_blob = False @@ -99,7 +99,7 @@ class ReflectorServer(Protocol): def dataReceived(self, data): if self.receiving_blob: - self.blob_write(data) + self.blob_writer.write(data) else: log.debug('Not yet recieving blob, data needs further processing') self.request_buff += data @@ -110,7 +110,7 @@ class ReflectorServer(Protocol): d.addErrback(self.handle_error) if self.receiving_blob and extra_data: log.debug('Writing extra data to blob') - self.blob_write(extra_data) + self.blob_writer.write(extra_data) def _get_valid_response(self, response_msg): extra_data = None @@ -221,7 +221,7 @@ class ReflectorServer(Protocol): sd_blob_hash = request_dict[SD_BLOB_HASH] sd_blob_size = request_dict[SD_BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size) d.addCallback(self.get_descriptor_response) d.addCallback(self.send_response) @@ -231,7 +231,7 @@ class ReflectorServer(Protocol): return d def get_descriptor_response(self, sd_blob): - if sd_blob.is_validated(): + if sd_blob.get_is_verified(): d = defer.succeed({SEND_SD_BLOB: False}) d.addCallback(self.request_needed_blobs, sd_blob) else: @@ -267,7 +267,7 @@ class ReflectorServer(Protocol): if 'blob_hash' in blob and 'length' in blob: blob_hash, blob_len = blob['blob_hash'], blob['length'] d = self.blob_manager.get_blob(blob_hash, blob_len) - d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None) + d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None) yield d def handle_blob_request(self, request_dict): @@ -293,7 +293,7 @@ class ReflectorServer(Protocol): blob_hash = request_dict[BLOB_HASH] blob_size = request_dict[BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: log.debug('Received info for blob: %s', blob_hash[:16]) d = self.blob_manager.get_blob(blob_hash, blob_size) d.addCallback(self.get_blob_response) @@ -305,7 +305,7 @@ class ReflectorServer(Protocol): return d def get_blob_response(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): return defer.succeed({SEND_BLOB: False}) else: self.incoming_blob = blob diff --git a/requirements.txt b/requirements.txt index 1f3bda1d7..d45fb6976 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ Twisted==16.6.0 +cryptography==2.0.3 appdirs==1.4.3 argparse==1.2.1 docopt==0.6.2 diff --git a/scripts/decrypt_blob.py b/scripts/decrypt_blob.py index bc905bf2e..4f5c8b8e9 100644 --- a/scripts/decrypt_blob.py +++ b/scripts/decrypt_blob.py @@ -10,7 +10,7 @@ from twisted.internet import reactor from lbrynet import conf from lbrynet.cryptstream import CryptBlob -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import log_support @@ -46,7 +46,7 @@ def decrypt_blob(blob_file, key, iv, output): filename = os.path.abspath(blob_file) length = os.path.getsize(filename) directory, blob_hash = os.path.split(filename) - blob = HashBlob.BlobFile(directory, blob_hash, True, length) + blob = BlobFile(directory, blob_hash, length) decryptor = CryptBlob.StreamBlobDecryptor( blob, binascii.unhexlify(key), binascii.unhexlify(iv), length) with open(output, 'w') as f: diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index fffc44c9a..43a510328 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -2,7 +2,7 @@ import argparse import logging import sys -import tempfile +import os from twisted.internet import defer from twisted.internet import reactor @@ -13,7 +13,7 @@ from lbrynet import conf from lbrynet.core import log_support from lbrynet.core import BlobManager from lbrynet.core import HashAnnouncer -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import RateLimiter from lbrynet.core import Peer from lbrynet.core import Wallet @@ -31,13 +31,14 @@ def main(args=None): parser.add_argument('--timeout', type=int, default=30) parser.add_argument('peer') parser.add_argument('blob_hash') + parser.add_argument('directory', type=str, default=os.getcwd()) args = parser.parse_args(args) log_support.configure_console(level='DEBUG') announcer = HashAnnouncer.DummyHashAnnouncer() blob_manager = MyBlobManager(announcer) - blob = HashBlob.TempBlob(args.blob_hash, False) + blob = BlobFile(args.directory, args.blob_hash) download_manager = SingleBlobDownloadManager(blob) peer = Peer.Peer(*conf.server_port(args.peer)) payment_rate_manager = DumbPaymentRateManager() diff --git a/scripts/encrypt_blob.py b/scripts/encrypt_blob.py index 440ea7a8d..3d3552f48 100644 --- a/scripts/encrypt_blob.py +++ b/scripts/encrypt_blob.py @@ -1,17 +1,18 @@ """Encrypt a single file using the given key and iv""" import argparse -import binascii import logging -import StringIO import sys from twisted.internet import defer from twisted.internet import reactor +from twisted.protocols import basic +from twisted.web.client import FileBodyProducer from lbrynet import conf -from lbrynet.cryptstream import CryptBlob from lbrynet.core import log_support -from lbrynet.core import cryptoutils +from lbrynet.core.HashAnnouncer import DummyHashAnnouncer +from lbrynet.core.BlobManager import DiskBlobManager +from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator log = logging.getLogger('decrypt_blob') @@ -26,7 +27,7 @@ def main(): args = parser.parse_args() log_support.configure_console(level='DEBUG') - d = run(args) + run(args) reactor.run() @@ -40,29 +41,23 @@ def run(args): reactor.callLater(0, reactor.stop) +@defer.inlineCallbacks def encrypt_blob(filename, key, iv): - blob = Blob() - blob_maker = CryptBlob.CryptStreamBlobMaker( - binascii.unhexlify(key), binascii.unhexlify(iv), 0, blob) - with open(filename) as fin: - blob_maker.write(fin.read()) - blob_maker.close() + dummy_announcer = DummyHashAnnouncer() + manager = DiskBlobManager(dummy_announcer, '.', '.') + yield manager.setup() + creator = CryptStreamCreator(manager, filename, key, iv_generator(iv)) + with open(filename, 'r') as infile: + producer = FileBodyProducer(infile, readSize=2**22) + yield producer.startProducing(creator) + yield creator.stop() -class Blob(object): - def __init__(self): - self.data = StringIO.StringIO() - - def write(self, data): - self.data.write(data) - - def close(self): - hashsum = cryptoutils.get_lbry_hash_obj() - buffer = self.data.getvalue() - hashsum.update(buffer) - with open(hashsum.hexdigest(), 'w') as fout: - fout.write(buffer) - return defer.succeed(True) +def iv_generator(iv): + iv = int(iv, 16) + while 1: + iv += 1 + yield ("%016d" % iv)[-16:] if __name__ == '__main__': diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 9eb638fa0..10e598b9f 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -182,7 +182,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) return @@ -213,7 +213,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) d = send_to_server([x[0] for x in self.expected_blobs]) @@ -244,7 +244,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) d = send_to_server([x[0] for x in self.expected_blobs]) diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index af2197d0b..32103e374 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -53,7 +53,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_unavailable_when_blob_not_validated(self): blob = mock.Mock() - blob.is_validated.return_value = False + blob.get_is_verified.return_value = False self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { 'blob_data_payment_rate': 1.0, @@ -68,7 +68,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_unavailable_when_blob_cannot_be_opened(self): blob = mock.Mock() - blob.is_validated.return_value = True + blob.get_is_verified.return_value = True blob.open_for_reading.return_value = None self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { @@ -84,7 +84,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_details_are_set_when_all_conditions_are_met(self): blob = mock.Mock() - blob.is_validated.return_value = True + blob.get_is_verified.return_value = True blob.open_for_reading.return_value = True blob.blob_hash = 'DEADBEEF' blob.length = 42 diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py index 1b7271dc2..f6b4a1f04 100644 --- a/tests/unit/core/test_BlobManager.py +++ b/tests/unit/core/test_BlobManager.py @@ -47,8 +47,8 @@ class BlobManagerTest(unittest.TestCase): yield self.bm.setup() blob = yield self.bm.get_blob(blob_hash,len(data)) - finished_d, write, cancel =yield blob.open_for_writing(self.peer) - yield write(data) + writer, finished_d = yield blob.open_for_writing(self.peer) + yield writer.write(data) yield self.bm.blob_completed(blob) yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data)) @@ -105,7 +105,7 @@ class BlobManagerTest(unittest.TestCase): # open the last blob blob = yield self.bm.get_blob(blob_hashes[-1]) - finished_d, write, cancel = yield blob.open_for_writing(self.peer) + writer, finished_d = yield blob.open_for_writing(self.peer) # delete the last blob and check if it still exists out = yield self.bm.delete_blobs([blob_hash]) @@ -114,4 +114,3 @@ class BlobManagerTest(unittest.TestCase): self.assertTrue(blob_hashes[-1] in blobs) self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1]))) - blob._close_writer(blob.writers[self.peer][0]) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py new file mode 100644 index 000000000..d1c282478 --- /dev/null +++ b/tests/unit/core/test_HashBlob.py @@ -0,0 +1,127 @@ +from lbrynet.blob import BlobFile +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError + + +from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lbry_hash +from twisted.trial import unittest +from twisted.internet import defer +import os +import time + + +class BlobFileTest(unittest.TestCase): + def setUp(self): + self.db_dir, self.blob_dir = mk_db_and_blob_dir() + self.fake_content_len = 64 + self.fake_content = bytearray('0'*self.fake_content_len) + self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca5602b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105' + + def tearDown(self): + rm_db_and_blob_dir(self.db_dir, self.blob_dir) + + @defer.inlineCallbacks + def test_good_write_and_read(self): + # test a write that should succeed + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertFalse(blob_file.verified) + + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) + writer.close() + out = yield finished_d + self.assertTrue(isinstance(out, BlobFile)) + self.assertTrue(out.verified) + self.assertEqual(self.fake_content_len, out.get_length()) + + # read from the instance used to write to, and verify content + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(c, self.fake_content) + self.assertFalse(out.is_downloading()) + + # read from newly declared instance, and verify content + del blob_file + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertTrue(blob_file.verified) + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(c, self.fake_content) + + @defer.inlineCallbacks + def test_delete(self): + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) + out = yield finished_d + out = yield blob_file.delete() + + blob_file = BlobFile(self.blob_dir, self.fake_content_hash) + self.assertFalse(blob_file.verified) + + @defer.inlineCallbacks + def test_too_much_write(self): + # writing too much data should result in failure + expected_length= 16 + content = bytearray('0'*32) + blob_hash = random_lbry_hash() + blob_file = BlobFile(self.blob_dir, blob_hash, expected_length) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) + out = yield self.assertFailure(finished_d, InvalidDataError) + + @defer.inlineCallbacks + def test_bad_hash(self): + # test a write that should fail because its content's hash + # does not equal the blob_hash + length= 64 + content = bytearray('0'*length) + blob_hash = random_lbry_hash() + blob_file = BlobFile(self.blob_dir, blob_hash, length) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) + yield self.assertFailure(finished_d, InvalidDataError) + + @defer.inlineCallbacks + def test_close_on_incomplete_write(self): + # write all but 1 byte of data, + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content[:self.fake_content_len-1]) + writer.close() + yield self.assertFailure(finished_d, DownloadCanceledError) + + # writes after close will throw a IOError exception + with self.assertRaises(IOError): + writer.write(self.fake_content) + + # another call to close will do nothing + writer.close() + + # file should not exist, since we did not finish write + blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + out = blob_file_2.open_for_reading() + self.assertEqual(None, out) + + @defer.inlineCallbacks + def test_multiple_writers(self): + # start first writer and write half way, and then start second writer and write everything + blob_hash = self.fake_content_hash + blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) + writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) + writer_1.write(self.fake_content[:self.fake_content_len/2]) + + writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) + writer_2.write(self.fake_content) + out_2 = yield finished_d_2 + out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError) + + self.assertTrue(isinstance(out_2, BlobFile)) + self.assertTrue(out_2.verified) + self.assertEqual(self.fake_content_len, out_2.get_length()) + + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(self.fake_content_len, len(c)) + self.assertEqual(bytearray(c), self.fake_content) + + diff --git a/tests/unit/cryptstream/test_cryptblob.py b/tests/unit/cryptstream/test_cryptblob.py index cb1110d77..624f1a747 100644 --- a/tests/unit/cryptstream/test_cryptblob.py +++ b/tests/unit/cryptstream/test_cryptblob.py @@ -1,7 +1,6 @@ from twisted.trial import unittest from twisted.internet import defer from lbrynet.cryptstream import CryptBlob -from lbrynet.core.HashBlob import TempBlobCreator from lbrynet import conf from tests.mocks import mock_conf_settings