diff --git a/CHANGELOG.md b/CHANGELOG.md index 820e64931..59498f825 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ at anytime. * ### Changed - * + * Moved BLOB_SIZE from conf.py to MAX_BLOB_SIZE in blob/blob_file.py * ### Added @@ -30,7 +30,7 @@ at anytime. * ### Removed - * + * Removed some alternate methods of reading from blob files * diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 78cf974ad..42b402030 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -1,18 +1,16 @@ import logging import os from twisted.internet import defer, threads -from twisted.protocols.basic import FileSender from twisted.web.client import FileBodyProducer from twisted.python.failure import Failure -from lbrynet import conf from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.utils import is_valid_blobhash from lbrynet.blob.writer import HashBlobWriter -from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0 - +from lbrynet.blob.reader import HashBlobReader log = logging.getLogger(__name__) +MAX_BLOB_SIZE = 2 * 2 ** 20 class BlobFile(object): """ @@ -78,7 +76,8 @@ class BlobFile(object): finished """ if self._verified is True: - reader = HashBlobReader(self.file_path, self.reader_finished) + f = open(self.file_path, 'rb') + reader = HashBlobReader(f, self.reader_finished) self.readers += 1 return reader return None @@ -124,7 +123,7 @@ class BlobFile(object): def set_length(self, length): if self.length is not None and length == self.length: return True - if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']: + if self.length is None and 0 <= length <= MAX_BLOB_SIZE: self.length = length return True log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", @@ -142,33 +141,6 @@ class BlobFile(object): return True return False - def read(self, write_func): - """ - This function is only used in StreamBlobDecryptor - and should be deprecated in favor of open_for_reading() - """ - def close_self(*args): - self.close_read_handle(file_handle) - return args[0] - - file_sender = FileSender() - reader = HashBlobReader_v0(write_func) - file_handle = self.open_for_reading() - if file_handle is not None: - d = file_sender.beginFileTransfer(file_handle, reader) - d.addCallback(close_self) - else: - d = defer.fail(IOError("Could not read the blob")) - return d - - def close_read_handle(self, file_handle): - """ - This function is only used in StreamBlobDecryptor - and should be deprecated in favor of open_for_reading() - """ - if file_handle is not None: - file_handle.close() - def reader_finished(self, reader): self.readers -= 1 return defer.succeed(True) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index dd248c8fd..afd62e57e 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -1,50 +1,22 @@ import logging -from twisted.internet import interfaces -from zope.interface import implements log = logging.getLogger(__name__) -class HashBlobReader_v0(object): - """ - This is a class that is only used in StreamBlobDecryptor - and should be deprecated - """ - implements(interfaces.IConsumer) - - def __init__(self, write_func): - self.write_func = write_func - - def registerProducer(self, producer, streaming): - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - pass - - def write(self, data): - from twisted.internet import reactor - - self.write_func(data) - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - class HashBlobReader(object): """ This is a file like reader class that supports read(size) and close() """ - def __init__(self, file_path, finished_cb): + def __init__(self, read_handle, finished_cb): self.finished_cb = finished_cb self.finished_cb_d = None - self.read_handle = open(file_path, 'rb') + self.read_handle = read_handle def __del__(self): + if self.finished_cb_d is None: + log.warn("Garbage collection was called, but reader for %s was not closed yet", + self.read_handle.name) self.close() def read(self, size=-1): diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py index a95430386..dc4d3d77a 100644 --- a/lbrynet/blob/writer.py +++ b/lbrynet/blob/writer.py @@ -16,6 +16,11 @@ class HashBlobWriter(object): self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 + def __del__(self): + if self.finished_cb_d is None: + log.warn("Garbage collection was called, but writer was not closed yet") + self.close() + @property def blob_hash(self): return self._hashsum.hexdigest() diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 4b156ad89..cc290a3bf 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -206,7 +206,6 @@ FIXED_SETTINGS = { 'API_ADDRESS': 'lbryapi', 'APP_NAME': APP_NAME, 'BLOBFILES_DIR': 'blobfiles', - 'BLOB_SIZE': 2 * MB, 'CRYPTSD_FILE_EXTENSION': '.cryptsd', 'CURRENCIES': { 'BTC': {'type': 'crypto'}, diff --git a/lbrynet/core/client/ClientRequest.py b/lbrynet/core/client/ClientRequest.py index 04c3dac7f..1dee9b9d6 100644 --- a/lbrynet/core/client/ClientRequest.py +++ b/lbrynet/core/client/ClientRequest.py @@ -1,5 +1,4 @@ -from lbrynet import conf - +from lbrynet.blob.blob_file import MAX_BLOB_SIZE class ClientRequest(object): def __init__(self, request_dict, response_identifier=None): @@ -17,7 +16,7 @@ class ClientBlobRequest(ClientPaidRequest): def __init__(self, request_dict, response_identifier, write_func, finished_deferred, cancel_func, blob): if blob.length is None: - max_pay_units = conf.settings['BLOB_SIZE'] + max_pay_units = MAX_BLOB_SIZE else: max_pay_units = blob.length ClientPaidRequest.__init__(self, request_dict, response_identifier, max_pay_units) diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index a7303a588..c99465673 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -1,13 +1,14 @@ import binascii import logging +from io import BytesIO from twisted.internet import defer +from twisted.web.client import FileBodyProducer from cryptography.hazmat.primitives.ciphers import Cipher, modes from cryptography.hazmat.primitives.ciphers.algorithms import AES from cryptography.hazmat.primitives.padding import PKCS7 from cryptography.hazmat.backends import default_backend -from lbrynet import conf from lbrynet.core.BlobInfo import BlobInfo - +from lbrynet.blob.blob_file import MAX_BLOB_SIZE log = logging.getLogger(__name__) backend = default_backend() @@ -46,6 +47,10 @@ class StreamBlobDecryptor(object): write_func - function that takes decrypted string as arugment and writes it somewhere + + Returns: + + deferred that returns after decrypting blob and writing content """ def remove_padding(data): @@ -67,13 +72,20 @@ class StreamBlobDecryptor(object): last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize() write_func(remove_padding(last_chunk)) - def decrypt_bytes(data): - self.buff += data - self.len_read += len(data) - write_bytes() - d = self.blob.read(decrypt_bytes) - d.addCallback(lambda _: finish_decrypt()) + read_handle = self.blob.open_for_reading() + + @defer.inlineCallbacks + def decrypt_bytes(): + producer = FileBodyProducer(read_handle) + buff = BytesIO() + yield producer.startProducing(buff) + self.buff = buff.getvalue() + self.len_read += len(self.buff) + write_bytes() + finish_decrypt() + + d = decrypt_bytes() return d @@ -106,7 +118,7 @@ class CryptStreamBlobMaker(object): max bytes are written. num_bytes_to_write is the number of bytes that will be written from data in this call """ - max_bytes_to_write = conf.settings['BLOB_SIZE'] - self.length - 1 + max_bytes_to_write = MAX_BLOB_SIZE - self.length - 1 done = False if max_bytes_to_write <= len(data): num_bytes_to_write = max_bytes_to_write diff --git a/lbrynet/tests/unit/cryptstream/test_cryptblob.py b/lbrynet/tests/unit/cryptstream/test_cryptblob.py index 083d2e1fc..2378c5770 100644 --- a/lbrynet/tests/unit/cryptstream/test_cryptblob.py +++ b/lbrynet/tests/unit/cryptstream/test_cryptblob.py @@ -1,7 +1,7 @@ from twisted.trial import unittest from twisted.internet import defer from lbrynet.cryptstream import CryptBlob -from lbrynet import conf +from lbrynet.blob.blob_file import MAX_BLOB_SIZE from lbrynet.tests.mocks import mock_conf_settings @@ -9,6 +9,7 @@ from Crypto import Random from Crypto.Cipher import AES import random import string +import StringIO class MocBlob(object): def __init__(self): @@ -19,6 +20,9 @@ class MocBlob(object): write_func(data) return defer.succeed(True) + def open_for_reading(self): + return StringIO.StringIO(self.data) + def write(self, data): self.data += data @@ -53,7 +57,7 @@ class TestCryptBlob(unittest.TestCase): expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size self.assertEqual(expected_encrypted_blob_size, len(blob.data)) - if size_of_data < conf.settings['BLOB_SIZE']-1: + if size_of_data < MAX_BLOB_SIZE-1: self.assertFalse(done) else: self.assertTrue(done) @@ -64,7 +68,7 @@ class TestCryptBlob(unittest.TestCase): # decrypt string decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data) - decryptor.decrypt(write_func) + yield decryptor.decrypt(write_func) self.assertEqual(self.data_buf, string_to_encrypt) @defer.inlineCallbacks