forked from LBRYCommunity/lbry-sdk
port cryptblob test to py3
This commit is contained in:
parent
315661208d
commit
39d7f2e46e
1 changed files with 10 additions and 8 deletions
|
@ -8,14 +8,14 @@ from tests.mocks import mock_conf_settings
|
||||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
import StringIO
|
from six import BytesIO
|
||||||
import os
|
import os
|
||||||
|
|
||||||
AES_BLOCK_SIZE_BYTES = AES.block_size / 8
|
AES_BLOCK_SIZE_BYTES = int(AES.block_size / 8)
|
||||||
|
|
||||||
class MocBlob(object):
|
class MocBlob(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.data = ''
|
self.data = b''
|
||||||
|
|
||||||
def read(self, write_func):
|
def read(self, write_func):
|
||||||
data = self.data
|
data = self.data
|
||||||
|
@ -23,9 +23,11 @@ class MocBlob(object):
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
def open_for_reading(self):
|
def open_for_reading(self):
|
||||||
return StringIO.StringIO(self.data)
|
return BytesIO(self.data)
|
||||||
|
|
||||||
def write(self, data):
|
def write(self, data):
|
||||||
|
if not isinstance(data, bytes):
|
||||||
|
data = data.encode()
|
||||||
self.data += data
|
self.data += data
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
@ -33,7 +35,7 @@ class MocBlob(object):
|
||||||
|
|
||||||
|
|
||||||
def random_string(length):
|
def random_string(length):
|
||||||
return ''.join(random.choice(string.lowercase) for i in range(length))
|
return ''.join(random.choice(string.ascii_lowercase) for i in range(length))
|
||||||
|
|
||||||
|
|
||||||
class TestCryptBlob(unittest.TestCase):
|
class TestCryptBlob(unittest.TestCase):
|
||||||
|
@ -50,20 +52,20 @@ class TestCryptBlob(unittest.TestCase):
|
||||||
iv = os.urandom(AES_BLOCK_SIZE_BYTES)
|
iv = os.urandom(AES_BLOCK_SIZE_BYTES)
|
||||||
maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob)
|
maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob)
|
||||||
write_size = size_of_data
|
write_size = size_of_data
|
||||||
string_to_encrypt = random_string(size_of_data)
|
string_to_encrypt = random_string(size_of_data).encode()
|
||||||
|
|
||||||
# encrypt string
|
# encrypt string
|
||||||
done, num_bytes = maker.write(string_to_encrypt)
|
done, num_bytes = maker.write(string_to_encrypt)
|
||||||
yield maker.close()
|
yield maker.close()
|
||||||
self.assertEqual(size_of_data, num_bytes)
|
self.assertEqual(size_of_data, num_bytes)
|
||||||
expected_encrypted_blob_size = ((size_of_data / AES_BLOCK_SIZE_BYTES) + 1) * AES_BLOCK_SIZE_BYTES
|
expected_encrypted_blob_size = int((size_of_data / AES_BLOCK_SIZE_BYTES) + 1) * AES_BLOCK_SIZE_BYTES
|
||||||
self.assertEqual(expected_encrypted_blob_size, len(blob.data))
|
self.assertEqual(expected_encrypted_blob_size, len(blob.data))
|
||||||
|
|
||||||
if size_of_data < MAX_BLOB_SIZE-1:
|
if size_of_data < MAX_BLOB_SIZE-1:
|
||||||
self.assertFalse(done)
|
self.assertFalse(done)
|
||||||
else:
|
else:
|
||||||
self.assertTrue(done)
|
self.assertTrue(done)
|
||||||
self.data_buf = ''
|
self.data_buf = b''
|
||||||
|
|
||||||
def write_func(data):
|
def write_func(data):
|
||||||
self.data_buf += data
|
self.data_buf += data
|
||||||
|
|
Loading…
Reference in a new issue