From 3479c6ea8db9f3fc7fffc75617acd009fbe548c1 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Mon, 26 Jun 2017 17:50:51 -0400 Subject: [PATCH] add docstring and unit test for CryptBlob.py --- lbrynet/cryptstream/CryptBlob.py | 33 +++++++++- tests/unit/cryptstream/__init__.py | 0 tests/unit/cryptstream/test_cryptblob.py | 79 ++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 tests/unit/cryptstream/__init__.py create mode 100644 tests/unit/cryptstream/test_cryptblob.py diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index 1aa2f4f1f..0b26d6f89 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -16,6 +16,15 @@ class CryptBlobInfo(BlobInfo): class StreamBlobDecryptor(object): def __init__(self, blob, key, iv, length): + """ + This class decrypts blob + + blob - object which implements read() function. + key = encryption_key + iv = initialization vector + blob_num = blob number (has no effect on encryption) + length = length in bytes of blob + """ self.blob = blob self.key = key self.iv = iv @@ -25,6 +34,12 @@ class StreamBlobDecryptor(object): self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) def decrypt(self, write_func): + """ + Decrypt blob and write its content useing write_func + + write_func - function that takes decrypted string as + arugment and writes it somewhere + """ def remove_padding(data): pad_len = ord(data[-1]) @@ -55,8 +70,16 @@ class StreamBlobDecryptor(object): class CryptStreamBlobMaker(object): - """This class encrypts data and writes it to a new blob""" def __init__(self, key, iv, blob_num, blob): + """ + This class encrypts data and writes it to a new blob + + key = encryption_key + iv = initialization vector + blob_num = blob number (has no effect on encryption) + blob = object which implements write(), close() function , close() function must + be a deferred. (Will generally be of HashBlobCreator type) + """ self.key = key self.iv = iv self.blob_num = blob_num @@ -66,6 +89,14 @@ class CryptStreamBlobMaker(object): self.length = 0 def write(self, data): + """ + encrypt and write string data + + Returns: + tuple (done, num_bytes_to_write) where done is True if + max bytes are written. num_bytes_to_write is the number + of bytes that will be written from data in this call + """ max_bytes_to_write = conf.settings['BLOB_SIZE'] - self.length - 1 done = False if max_bytes_to_write <= len(data): diff --git a/tests/unit/cryptstream/__init__.py b/tests/unit/cryptstream/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/cryptstream/test_cryptblob.py b/tests/unit/cryptstream/test_cryptblob.py new file mode 100644 index 000000000..cb1110d77 --- /dev/null +++ b/tests/unit/cryptstream/test_cryptblob.py @@ -0,0 +1,79 @@ +from twisted.trial import unittest +from twisted.internet import defer +from lbrynet.cryptstream import CryptBlob +from lbrynet.core.HashBlob import TempBlobCreator +from lbrynet import conf + +from tests.mocks import mock_conf_settings + +from Crypto import Random +from Crypto.Cipher import AES +import tempfile +import random +import string +import StringIO +import time + +class MocBlob(object): + def __init__(self): + self.data = '' + + def read(self, write_func): + data = self.data + write_func(data) + return defer.succeed(True) + + def write(self, data): + self.data += data + + def close(self): + return defer.succeed(True) + + +def random_string(length): + return ''.join(random.choice(string.lowercase) for i in range(length)) + + +class TestCryptBlob(unittest.TestCase): + def setUp(self): + mock_conf_settings(self) + + + @defer.inlineCallbacks + def _test_encrypt_decrypt(self, size_of_data): + # max blob size is 2*2**20 -1 ( -1 due to required padding in the end ) + blob = MocBlob() + blob_num = 0 + key = Random.new().read(AES.block_size) + iv = Random.new().read(AES.block_size) + maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob) + write_size = size_of_data + string_to_encrypt = random_string(size_of_data) + + # encrypt string + done,num_bytes = maker.write(string_to_encrypt) + yield maker.close() + self.assertEqual(size_of_data,num_bytes) + expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size + self.assertEqual(expected_encrypted_blob_size, len(blob.data)) + + if size_of_data < conf.settings['BLOB_SIZE']-1: + self.assertFalse(done) + else: + self.assertTrue(done) + self.data_buf = '' + + def write_func(data): + self.data_buf += data + + # decrypt string + decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data) + decryptor.decrypt(write_func) + self.assertEqual(self.data_buf,string_to_encrypt) + + @defer.inlineCallbacks + def test_encrypt_decrypt(self): + yield self._test_encrypt_decrypt(1) + yield self._test_encrypt_decrypt(16*2) + yield self._test_encrypt_decrypt(2000) + yield self._test_encrypt_decrypt(2*2**20-1)