forked from LBRYCommunity/lbry-sdk
10b34d6b33
instead of recursive bytes2unicode use a proper JSONEncoder to conver bytes->unicode for json.dumps() removing excessive isinstance(data, bytes) checks py3: / -> // and list() around .items() that gets modified in loop moved lbrynet.undecorated to where its actually used and hopefully we can delete it eventually removed build/upload_assets.py, travis can do all this now
113 lines
4.9 KiB
Python
113 lines
4.9 KiB
Python
import json
|
|
import mock
|
|
from twisted.trial import unittest
|
|
from twisted.internet import defer
|
|
|
|
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
|
from lbrynet.database.storage import SQLiteStorage
|
|
from lbrynet.core.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader
|
|
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
|
|
from lbrynet.core.BlobManager import DiskBlobManager
|
|
from lbrynet.core.PeerManager import PeerManager
|
|
from lbrynet.core.RateLimiter import DummyRateLimiter
|
|
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
|
|
from lbrynet.database.storage import SQLiteStorage
|
|
from lbrynet.file_manager import EncryptedFileCreator
|
|
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
|
|
from lbrynet.core.StreamDescriptor import JSONBytesEncoder
|
|
from tests import mocks
|
|
from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir
|
|
|
|
|
|
FakeNode = mocks.Node
|
|
FakeWallet = mocks.Wallet
|
|
FakePeerFinder = mocks.PeerFinder
|
|
FakeAnnouncer = mocks.Announcer
|
|
GenFile = mocks.GenFile
|
|
test_create_stream_sd_file = mocks.create_stream_sd_file
|
|
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
|
|
|
|
MB = 2**20
|
|
|
|
|
|
def iv_generator():
|
|
while True:
|
|
yield b'3' * (AES.block_size // 8)
|
|
|
|
|
|
class CreateEncryptedFileTest(unittest.TestCase):
|
|
timeout = 5
|
|
|
|
def setUp(self):
|
|
mocks.mock_conf_settings(self)
|
|
self.tmp_db_dir, self.tmp_blob_dir = mk_db_and_blob_dir()
|
|
self.wallet = FakeWallet()
|
|
self.peer_manager = PeerManager()
|
|
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2)
|
|
self.rate_limiter = DummyRateLimiter()
|
|
self.sd_identifier = StreamDescriptorIdentifier()
|
|
self.storage = SQLiteStorage(self.tmp_db_dir)
|
|
self.blob_manager = DiskBlobManager(self.tmp_blob_dir, self.storage)
|
|
self.prm = OnlyFreePaymentsManager()
|
|
self.lbry_file_manager = EncryptedFileManager(self.peer_finder, self.rate_limiter, self.blob_manager,
|
|
self.wallet, self.prm, self.storage, self.sd_identifier)
|
|
d = self.storage.setup()
|
|
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
|
return d
|
|
|
|
@defer.inlineCallbacks
|
|
def tearDown(self):
|
|
yield self.lbry_file_manager.stop()
|
|
yield self.blob_manager.stop()
|
|
yield self.storage.stop()
|
|
rm_db_and_blob_dir(self.tmp_db_dir, self.tmp_blob_dir)
|
|
|
|
@defer.inlineCallbacks
|
|
def create_file(self, filename):
|
|
handle = mocks.GenFile(3*MB, '1')
|
|
key = b'2' * (AES.block_size // 8)
|
|
out = yield EncryptedFileCreator.create_lbry_file(
|
|
self.blob_manager, self.storage, self.prm, self.lbry_file_manager, filename, handle, key, iv_generator()
|
|
)
|
|
defer.returnValue(out)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_can_create_file(self):
|
|
expected_stream_hash = b"41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c35" \
|
|
b"7b1acb742dd83151fb66393a7709e9f346260a4f4db6de10c25"
|
|
expected_sd_hash = "40c485432daec586c1a2d247e6c08d137640a5af6e81f3f652" \
|
|
"3e62e81a2e8945b0db7c94f1852e70e371d917b994352c"
|
|
filename = 'test.file'
|
|
lbry_file = yield self.create_file(filename)
|
|
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)
|
|
|
|
# read the sd blob file
|
|
sd_blob = self.blob_manager.blobs[sd_hash]
|
|
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
|
sd_file_info = yield sd_reader.get_info()
|
|
|
|
# this comes from the database, the blobs returned are sorted
|
|
sd_info = yield get_sd_info(self.storage, lbry_file.stream_hash, include_blobs=True)
|
|
self.maxDiff = None
|
|
unicode_sd_info = json.loads(json.dumps(sd_info, sort_keys=True, cls=JSONBytesEncoder))
|
|
self.assertDictEqual(unicode_sd_info, sd_file_info)
|
|
self.assertEqual(sd_info['stream_hash'], expected_stream_hash)
|
|
self.assertEqual(len(sd_info['blobs']), 3)
|
|
self.assertNotEqual(sd_info['blobs'][0]['length'], 0)
|
|
self.assertNotEqual(sd_info['blobs'][1]['length'], 0)
|
|
self.assertEqual(sd_info['blobs'][2]['length'], 0)
|
|
self.assertEqual(expected_stream_hash, lbry_file.stream_hash)
|
|
self.assertEqual(sd_hash, lbry_file.sd_hash)
|
|
self.assertEqual(sd_hash, expected_sd_hash)
|
|
blobs = yield self.blob_manager.get_all_verified_blobs()
|
|
self.assertEqual(3, len(blobs))
|
|
num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs()
|
|
self.assertEqual(2, num_should_announce_blobs)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_can_create_file_with_unicode_filename(self):
|
|
expected_stream_hash = (b'd1da4258f3ce12edb91d7e8e160d091d3ab1432c2e55a6352dce0'
|
|
b'2fd5adb86fe144e93e110075b5865fff8617776c6c0')
|
|
filename = u'☃.file'
|
|
lbry_file = yield self.create_file(filename)
|
|
self.assertEqual(expected_stream_hash, lbry_file.stream_hash)
|