replace pycrypto->cryptography on tests

This commit is contained in:
Victor Shyba 2018-05-04 22:12:43 -03:00
parent cc34c4b95f
commit b389e59452
5 changed files with 32 additions and 28 deletions

View file

@ -7,8 +7,7 @@ import sys
import random import random
import unittest import unittest
from Crypto import Random from hashlib import md5
from Crypto.Hash import MD5
from lbrynet import conf from lbrynet import conf
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
@ -98,9 +97,6 @@ class LbryUploader(object):
from twisted.internet import reactor from twisted.internet import reactor
self.reactor = reactor self.reactor = reactor
logging.debug("Starting the uploader") logging.debug("Starting the uploader")
Random.atfork()
r = random.Random()
r.seed("start_lbry_uploader")
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 1) peer_finder = FakePeerFinder(5553, peer_manager, 1)
@ -191,10 +187,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
logging.debug("Starting the uploader") logging.debug("Starting the uploader")
Random.atfork()
r = random.Random()
r.seed("start_lbry_reuploader")
wallet = FakeWallet() wallet = FakeWallet()
peer_port = 5553 + n peer_port = 5553 + n
@ -297,7 +289,6 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
logging.debug("Starting the uploader") logging.debug("Starting the uploader")
Random.atfork()
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
@ -515,7 +506,7 @@ class TestTransfer(TestCase):
def check_md5_sum(): def check_md5_sum():
f = open(os.path.join(db_dir, 'test_file')) f = open(os.path.join(db_dir, 'test_file'))
hashsum = MD5.new() hashsum = md5()
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
@ -688,7 +679,7 @@ class TestTransfer(TestCase):
def check_md5_sum(): def check_md5_sum():
f = open(os.path.join(db_dir, 'test_file')) f = open(os.path.join(db_dir, 'test_file'))
hashsum = MD5.new() hashsum = md5()
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
@ -811,7 +802,7 @@ class TestTransfer(TestCase):
def check_md5_sum(): def check_md5_sum():
f = open('test_file') f = open('test_file')
hashsum = MD5.new() hashsum = md5()
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "e5941d615f53312fd66638239c1f90d5") self.assertEqual(hashsum.hexdigest(), "e5941d615f53312fd66638239c1f90d5")

View file

@ -2,7 +2,7 @@ import os
import shutil import shutil
import tempfile import tempfile
from Crypto.Hash import MD5 from hashlib import md5
from twisted.trial.unittest import TestCase from twisted.trial.unittest import TestCase
from twisted.internet import defer, threads from twisted.internet import defer, threads
@ -127,7 +127,7 @@ class TestStreamify(TestCase):
self.assertTrue(lbry_file.sd_hash, sd_hash) self.assertTrue(lbry_file.sd_hash, sd_hash)
yield lbry_file.start() yield lbry_file.start()
f = open('test_file') f = open('test_file')
hashsum = MD5.new() hashsum = md5()
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b") self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")

View file

@ -1,7 +1,10 @@
import base64
import struct import struct
import io import io
from Crypto.PublicKey import RSA from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from twisted.internet import defer, error from twisted.internet import defer, error
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -15,6 +18,12 @@ from lbrynet import conf
from util import debug_kademlia_packet from util import debug_kademlia_packet
KB = 2**10 KB = 2**10
PUBLIC_EXPOENT = 65537 # http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html
def decode_rsa_key(pem_key):
decoded = base64.b64decode(''.join(pem_key.splitlines()[1:-1]))
return serialization.load_der_public_key(decoded, default_backend())
class FakeLBRYFile(object): class FakeLBRYFile(object):
@ -137,9 +146,9 @@ class PointTraderKeyQueryHandler(object):
if self.query_identifiers[0] in queries: if self.query_identifiers[0] in queries:
new_encoded_pub_key = queries[self.query_identifiers[0]] new_encoded_pub_key = queries[self.query_identifiers[0]]
try: try:
RSA.importKey(new_encoded_pub_key) decode_rsa_key(new_encoded_pub_key)
except (ValueError, TypeError, IndexError): except (ValueError, TypeError, IndexError):
return defer.fail(Failure(ValueError("Client sent an invalid public key"))) return defer.fail(Failure(ValueError("Client sent an invalid public key: {}".format(new_encoded_pub_key))))
self.public_key = new_encoded_pub_key self.public_key = new_encoded_pub_key
self.wallet.set_public_key_for_peer(self.peer, self.public_key) self.wallet.set_public_key_for_peer(self.peer, self.public_key)
fields = {'public_key': self.wallet.encoded_public_key} fields = {'public_key': self.wallet.encoded_public_key}
@ -152,8 +161,10 @@ class PointTraderKeyQueryHandler(object):
class Wallet(object): class Wallet(object):
def __init__(self): def __init__(self):
self.private_key = RSA.generate(1024) self.private_key = rsa.generate_private_key(public_exponent=PUBLIC_EXPOENT,
self.encoded_public_key = self.private_key.publickey().exportKey() key_size=1024, backend=default_backend())
self.encoded_public_key = self.private_key.public_key().public_bytes(serialization.Encoding.PEM,
serialization.PublicFormat.PKCS1)
self._config = None self._config = None
self.network = None self.network = None
self.wallet = None self.wallet = None

View file

@ -5,11 +5,13 @@ from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from lbrynet.tests.mocks import mock_conf_settings from lbrynet.tests.mocks import mock_conf_settings
from Crypto import Random from cryptography.hazmat.primitives.ciphers.algorithms import AES
from Crypto.Cipher import AES
import random import random
import string import string
import StringIO import StringIO
import os
AES_BLOCK_SIZE_BYTES = AES.block_size / 8
class MocBlob(object): class MocBlob(object):
def __init__(self): def __init__(self):
@ -44,8 +46,8 @@ class TestCryptBlob(unittest.TestCase):
# max blob size is 2*2**20 -1 ( -1 due to required padding in the end ) # max blob size is 2*2**20 -1 ( -1 due to required padding in the end )
blob = MocBlob() blob = MocBlob()
blob_num = 0 blob_num = 0
key = Random.new().read(AES.block_size) key = os.urandom(AES_BLOCK_SIZE_BYTES)
iv = Random.new().read(AES.block_size) iv = os.urandom(AES_BLOCK_SIZE_BYTES)
maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob) maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob)
write_size = size_of_data write_size = size_of_data
string_to_encrypt = random_string(size_of_data) string_to_encrypt = random_string(size_of_data)
@ -54,7 +56,7 @@ class TestCryptBlob(unittest.TestCase):
done, num_bytes = maker.write(string_to_encrypt) done, num_bytes = maker.write(string_to_encrypt)
yield maker.close() yield maker.close()
self.assertEqual(size_of_data, num_bytes) self.assertEqual(size_of_data, num_bytes)
expected_encrypted_blob_size = ((size_of_data / AES.block_size) + 1) * AES.block_size expected_encrypted_blob_size = ((size_of_data / AES_BLOCK_SIZE_BYTES) + 1) * AES_BLOCK_SIZE_BYTES
self.assertEqual(expected_encrypted_blob_size, len(blob.data)) self.assertEqual(expected_encrypted_blob_size, len(blob.data))
if size_of_data < MAX_BLOB_SIZE-1: if size_of_data < MAX_BLOB_SIZE-1:

View file

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from Crypto.Cipher import AES from cryptography.hazmat.primitives.ciphers.algorithms import AES
import mock import mock
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
@ -18,7 +18,7 @@ MB = 2**20
def iv_generator(): def iv_generator():
while True: while True:
yield '3' * AES.block_size yield '3' * (AES.block_size / 8)
class CreateEncryptedFileTest(unittest.TestCase): class CreateEncryptedFileTest(unittest.TestCase):
@ -47,7 +47,7 @@ class CreateEncryptedFileTest(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def create_file(self, filename): def create_file(self, filename):
handle = mocks.GenFile(3*MB, '1') handle = mocks.GenFile(3*MB, '1')
key = '2'*AES.block_size key = '2' * (AES.block_size / 8)
out = yield EncryptedFileCreator.create_lbry_file(self.session, self.file_manager, filename, handle, out = yield EncryptedFileCreator.create_lbry_file(self.session, self.file_manager, filename, handle,
key, iv_generator()) key, iv_generator())
defer.returnValue(out) defer.returnValue(out)