Merge pull request #733 from lbryio/crypstream_tests

Add docstring and unit test for CryptBlob.py
This commit is contained in:
Umpei Kay Kurokawa 2017-06-28 14:45:53 -04:00 committed by GitHub
commit f06366b7dd
4 changed files with 112 additions and 1 deletions

View file

@ -11,6 +11,7 @@ at anytime.
### Added ### Added
* Missing docstring for `blob_list` * Missing docstring for `blob_list`
* Added convenient import for setting up a daemon client, `from lbrynet.daemon import get_client` * Added convenient import for setting up a daemon client, `from lbrynet.daemon import get_client`
* Added unit tests for CryptBlob.py
### Changed ### Changed
* Change `max_key_fee` setting to be a dictionary with values for `currency` and `amount` * Change `max_key_fee` setting to be a dictionary with values for `currency` and `amount`

View file

@ -16,6 +16,15 @@ class CryptBlobInfo(BlobInfo):
class StreamBlobDecryptor(object): class StreamBlobDecryptor(object):
def __init__(self, blob, key, iv, length): def __init__(self, blob, key, iv, length):
"""
This class decrypts blob
blob - object which implements read() function.
key = encryption_key
iv = initialization vector
blob_num = blob number (has no effect on encryption)
length = length in bytes of blob
"""
self.blob = blob self.blob = blob
self.key = key self.key = key
self.iv = iv self.iv = iv
@ -25,6 +34,12 @@ class StreamBlobDecryptor(object):
self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
def decrypt(self, write_func): def decrypt(self, write_func):
"""
Decrypt blob and write its content useing write_func
write_func - function that takes decrypted string as
arugment and writes it somewhere
"""
def remove_padding(data): def remove_padding(data):
pad_len = ord(data[-1]) pad_len = ord(data[-1])
@ -55,8 +70,16 @@ class StreamBlobDecryptor(object):
class CryptStreamBlobMaker(object): class CryptStreamBlobMaker(object):
"""This class encrypts data and writes it to a new blob"""
def __init__(self, key, iv, blob_num, blob): def __init__(self, key, iv, blob_num, blob):
"""
This class encrypts data and writes it to a new blob
key = encryption_key
iv = initialization vector
blob_num = blob number (has no effect on encryption)
blob = object which implements write(), close() function , close() function must
be a deferred. (Will generally be of HashBlobCreator type)
"""
self.key = key self.key = key
self.iv = iv self.iv = iv
self.blob_num = blob_num self.blob_num = blob_num
@ -66,6 +89,14 @@ class CryptStreamBlobMaker(object):
self.length = 0 self.length = 0
def write(self, data): def write(self, data):
"""
encrypt and write string data
Returns:
tuple (done, num_bytes_to_write) where done is True if
max bytes are written. num_bytes_to_write is the number
of bytes that will be written from data in this call
"""
max_bytes_to_write = conf.settings['BLOB_SIZE'] - self.length - 1 max_bytes_to_write = conf.settings['BLOB_SIZE'] - self.length - 1
done = False done = False
if max_bytes_to_write <= len(data): if max_bytes_to_write <= len(data):

View file

View file

@ -0,0 +1,79 @@
from twisted.trial import unittest
from twisted.internet import defer
from lbrynet.cryptstream import CryptBlob
from lbrynet.core.HashBlob import TempBlobCreator
from lbrynet import conf
from tests.mocks import mock_conf_settings
from Crypto import Random
from Crypto.Cipher import AES
import tempfile
import random
import string
import StringIO
import time
class MocBlob(object):
def __init__(self):
self.data = ''
def read(self, write_func):
data = self.data
write_func(data)
return defer.succeed(True)
def write(self, data):
self.data += data
def close(self):
return defer.succeed(True)
def random_string(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
class TestCryptBlob(unittest.TestCase):
def setUp(self):
mock_conf_settings(self)
@defer.inlineCallbacks
def _test_encrypt_decrypt(self, size_of_data):
# max blob size is 2*2**20 -1 ( -1 due to required padding in the end )
blob = MocBlob()
blob_num = 0
key = Random.new().read(AES.block_size)
iv = Random.new().read(AES.block_size)
maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob)
write_size = size_of_data
string_to_encrypt = random_string(size_of_data)
# encrypt string
done,num_bytes = maker.write(string_to_encrypt)
yield maker.close()
self.assertEqual(size_of_data,num_bytes)
expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size
self.assertEqual(expected_encrypted_blob_size, len(blob.data))
if size_of_data < conf.settings['BLOB_SIZE']-1:
self.assertFalse(done)
else:
self.assertTrue(done)
self.data_buf = ''
def write_func(data):
self.data_buf += data
# decrypt string
decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data)
decryptor.decrypt(write_func)
self.assertEqual(self.data_buf,string_to_encrypt)
@defer.inlineCallbacks
def test_encrypt_decrypt(self):
yield self._test_encrypt_decrypt(1)
yield self._test_encrypt_decrypt(16*2)
yield self._test_encrypt_decrypt(2000)
yield self._test_encrypt_decrypt(2*2**20-1)