Merge pull request #931 from lbryio/fix_blob_reader_closing

Fixes to Blob (unify way to read from BlobFile, move conf.py BLOB_SIZE  )
This commit is contained in:
Umpei Kay Kurokawa 2017-10-31 15:44:23 -04:00 committed by GitHub
commit 62ec404acc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 47 additions and 84 deletions

View file

@ -22,7 +22,7 @@ at anytime.
* *
### Changed ### Changed
* * Moved BLOB_SIZE from conf.py to MAX_BLOB_SIZE in blob/blob_file.py
* *
### Added ### Added
@ -30,7 +30,7 @@ at anytime.
* *
### Removed ### Removed
* * Removed some alternate methods of reading from blob files
* *

View file

@ -1,18 +1,16 @@
import logging import logging
import os import os
from twisted.internet import defer, threads from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer from twisted.web.client import FileBodyProducer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.utils import is_valid_blobhash
from lbrynet.blob.writer import HashBlobWriter from lbrynet.blob.writer import HashBlobWriter
from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0 from lbrynet.blob.reader import HashBlobReader
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
MAX_BLOB_SIZE = 2 * 2 ** 20
class BlobFile(object): class BlobFile(object):
""" """
@ -78,7 +76,8 @@ class BlobFile(object):
finished finished
""" """
if self._verified is True: if self._verified is True:
reader = HashBlobReader(self.file_path, self.reader_finished) f = open(self.file_path, 'rb')
reader = HashBlobReader(f, self.reader_finished)
self.readers += 1 self.readers += 1
return reader return reader
return None return None
@ -124,7 +123,7 @@ class BlobFile(object):
def set_length(self, length): def set_length(self, length):
if self.length is not None and length == self.length: if self.length is not None and length == self.length:
return True return True
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']: if self.length is None and 0 <= length <= MAX_BLOB_SIZE:
self.length = length self.length = length
return True return True
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
@ -142,33 +141,6 @@ class BlobFile(object):
return True return True
return False return False
def read(self, write_func):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
def close_self(*args):
self.close_read_handle(file_handle)
return args[0]
file_sender = FileSender()
reader = HashBlobReader_v0(write_func)
file_handle = self.open_for_reading()
if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader)
d.addCallback(close_self)
else:
d = defer.fail(IOError("Could not read the blob"))
return d
def close_read_handle(self, file_handle):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
if file_handle is not None:
file_handle.close()
def reader_finished(self, reader): def reader_finished(self, reader):
self.readers -= 1 self.readers -= 1
return defer.succeed(True) return defer.succeed(True)

View file

@ -1,50 +1,22 @@
import logging import logging
from twisted.internet import interfaces
from zope.interface import implements
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class HashBlobReader_v0(object):
"""
This is a class that is only used in StreamBlobDecryptor
and should be deprecated
"""
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
class HashBlobReader(object): class HashBlobReader(object):
""" """
This is a file like reader class that supports This is a file like reader class that supports
read(size) and close() read(size) and close()
""" """
def __init__(self, file_path, finished_cb): def __init__(self, read_handle, finished_cb):
self.finished_cb = finished_cb self.finished_cb = finished_cb
self.finished_cb_d = None self.finished_cb_d = None
self.read_handle = open(file_path, 'rb') self.read_handle = read_handle
def __del__(self): def __del__(self):
if self.finished_cb_d is None:
log.warn("Garbage collection was called, but reader for %s was not closed yet",
self.read_handle.name)
self.close() self.close()
def read(self, size=-1): def read(self, size=-1):

View file

@ -16,6 +16,11 @@ class HashBlobWriter(object):
self._hashsum = get_lbry_hash_obj() self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0 self.len_so_far = 0
def __del__(self):
if self.finished_cb_d is None:
log.warn("Garbage collection was called, but writer was not closed yet")
self.close()
@property @property
def blob_hash(self): def blob_hash(self):
return self._hashsum.hexdigest() return self._hashsum.hexdigest()

View file

@ -206,7 +206,6 @@ FIXED_SETTINGS = {
'API_ADDRESS': 'lbryapi', 'API_ADDRESS': 'lbryapi',
'APP_NAME': APP_NAME, 'APP_NAME': APP_NAME,
'BLOBFILES_DIR': 'blobfiles', 'BLOBFILES_DIR': 'blobfiles',
'BLOB_SIZE': 2 * MB,
'CRYPTSD_FILE_EXTENSION': '.cryptsd', 'CRYPTSD_FILE_EXTENSION': '.cryptsd',
'CURRENCIES': { 'CURRENCIES': {
'BTC': {'type': 'crypto'}, 'BTC': {'type': 'crypto'},

View file

@ -1,5 +1,4 @@
from lbrynet import conf from lbrynet.blob.blob_file import MAX_BLOB_SIZE
class ClientRequest(object): class ClientRequest(object):
def __init__(self, request_dict, response_identifier=None): def __init__(self, request_dict, response_identifier=None):
@ -17,7 +16,7 @@ class ClientBlobRequest(ClientPaidRequest):
def __init__(self, request_dict, response_identifier, write_func, finished_deferred, def __init__(self, request_dict, response_identifier, write_func, finished_deferred,
cancel_func, blob): cancel_func, blob):
if blob.length is None: if blob.length is None:
max_pay_units = conf.settings['BLOB_SIZE'] max_pay_units = MAX_BLOB_SIZE
else: else:
max_pay_units = blob.length max_pay_units = blob.length
ClientPaidRequest.__init__(self, request_dict, response_identifier, max_pay_units) ClientPaidRequest.__init__(self, request_dict, response_identifier, max_pay_units)

View file

@ -1,13 +1,14 @@
import binascii import binascii
import logging import logging
from io import BytesIO
from twisted.internet import defer from twisted.internet import defer
from twisted.web.client import FileBodyProducer
from cryptography.hazmat.primitives.ciphers import Cipher, modes from cryptography.hazmat.primitives.ciphers import Cipher, modes
from cryptography.hazmat.primitives.ciphers.algorithms import AES from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.padding import PKCS7 from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from lbrynet import conf
from lbrynet.core.BlobInfo import BlobInfo from lbrynet.core.BlobInfo import BlobInfo
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
backend = default_backend() backend = default_backend()
@ -46,6 +47,10 @@ class StreamBlobDecryptor(object):
write_func - function that takes decrypted string as write_func - function that takes decrypted string as
arugment and writes it somewhere arugment and writes it somewhere
Returns:
deferred that returns after decrypting blob and writing content
""" """
def remove_padding(data): def remove_padding(data):
@ -67,13 +72,20 @@ class StreamBlobDecryptor(object):
last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize() last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize()
write_func(remove_padding(last_chunk)) write_func(remove_padding(last_chunk))
def decrypt_bytes(data):
self.buff += data
self.len_read += len(data)
write_bytes()
d = self.blob.read(decrypt_bytes) read_handle = self.blob.open_for_reading()
d.addCallback(lambda _: finish_decrypt())
@defer.inlineCallbacks
def decrypt_bytes():
producer = FileBodyProducer(read_handle)
buff = BytesIO()
yield producer.startProducing(buff)
self.buff = buff.getvalue()
self.len_read += len(self.buff)
write_bytes()
finish_decrypt()
d = decrypt_bytes()
return d return d
@ -106,7 +118,7 @@ class CryptStreamBlobMaker(object):
max bytes are written. num_bytes_to_write is the number max bytes are written. num_bytes_to_write is the number
of bytes that will be written from data in this call of bytes that will be written from data in this call
""" """
max_bytes_to_write = conf.settings['BLOB_SIZE'] - self.length - 1 max_bytes_to_write = MAX_BLOB_SIZE - self.length - 1
done = False done = False
if max_bytes_to_write <= len(data): if max_bytes_to_write <= len(data):
num_bytes_to_write = max_bytes_to_write num_bytes_to_write = max_bytes_to_write

View file

@ -1,7 +1,7 @@
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
from lbrynet.cryptstream import CryptBlob from lbrynet.cryptstream import CryptBlob
from lbrynet import conf from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from lbrynet.tests.mocks import mock_conf_settings from lbrynet.tests.mocks import mock_conf_settings
@ -9,6 +9,7 @@ from Crypto import Random
from Crypto.Cipher import AES from Crypto.Cipher import AES
import random import random
import string import string
import StringIO
class MocBlob(object): class MocBlob(object):
def __init__(self): def __init__(self):
@ -19,6 +20,9 @@ class MocBlob(object):
write_func(data) write_func(data)
return defer.succeed(True) return defer.succeed(True)
def open_for_reading(self):
return StringIO.StringIO(self.data)
def write(self, data): def write(self, data):
self.data += data self.data += data
@ -53,7 +57,7 @@ class TestCryptBlob(unittest.TestCase):
expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size
self.assertEqual(expected_encrypted_blob_size, len(blob.data)) self.assertEqual(expected_encrypted_blob_size, len(blob.data))
if size_of_data < conf.settings['BLOB_SIZE']-1: if size_of_data < MAX_BLOB_SIZE-1:
self.assertFalse(done) self.assertFalse(done)
else: else:
self.assertTrue(done) self.assertTrue(done)
@ -64,7 +68,7 @@ class TestCryptBlob(unittest.TestCase):
# decrypt string # decrypt string
decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data) decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data)
decryptor.decrypt(write_func) yield decryptor.decrypt(write_func)
self.assertEqual(self.data_buf, string_to_encrypt) self.assertEqual(self.data_buf, string_to_encrypt)
@defer.inlineCallbacks @defer.inlineCallbacks