work on unit tests for file management related classes
This commit is contained in:
parent
eae8f8a00e
commit
f600038712
5 changed files with 147 additions and 0 deletions
0
tests/unit/lbryfile/__init__.py
Normal file
0
tests/unit/lbryfile/__init__.py
Normal file
0
tests/unit/lbryfile/client/__init__.py
Normal file
0
tests/unit/lbryfile/client/__init__.py
Normal file
32
tests/unit/lbryfile/client/test_EncryptedFileDownloader.py
Normal file
32
tests/unit/lbryfile/client/test_EncryptedFileDownloader.py
Normal file
|
@ -0,0 +1,32 @@
|
|||
import os.path
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver
|
||||
|
||||
|
||||
|
||||
class TestEncryptedFileSaver(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_setup_output(self):
|
||||
file_name = 'encrypted_file_saver_test.tmp'
|
||||
self.assertFalse(os.path.isfile(file_name))
|
||||
|
||||
# create file in the temporary trial folder
|
||||
stream_hash = ''
|
||||
peer_finder = None
|
||||
rate_limiter = None
|
||||
blob_manager = None
|
||||
stream_info_manager = None
|
||||
payment_rate_manager = None
|
||||
wallet = None
|
||||
download_directory = '.'
|
||||
upload_allowed = False
|
||||
saver = EncryptedFileSaver(stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet, download_directory, file_name)
|
||||
|
||||
yield saver._setup_output()
|
||||
self.assertTrue(os.path.isfile(file_name))
|
||||
saver._close_output()
|
||||
|
||||
|
80
tests/unit/lbryfile/test_EncryptedFileMetadataManager.py
Normal file
80
tests/unit/lbryfile/test_EncryptedFileMetadataManager.py
Normal file
|
@ -0,0 +1,80 @@
|
|||
import tempfile
|
||||
import shutil
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
|
||||
from lbrynet.core import utils
|
||||
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
|
||||
from lbrynet.core.Error import NoSuchStreamHash
|
||||
from tests.util import random_lbry_hash
|
||||
|
||||
class DBEncryptedFileMetadataManagerTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.db_dir = tempfile.mkdtemp()
|
||||
self.manager = DBEncryptedFileMetadataManager(self.db_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.db_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_basic(self):
|
||||
yield self.manager.setup()
|
||||
out = yield self.manager.get_all_streams()
|
||||
self.assertEqual(len(out),0)
|
||||
|
||||
stream_hash = random_lbry_hash()
|
||||
file_name = 'file_name'
|
||||
key = 'key'
|
||||
suggested_file_name = 'sug_file_name'
|
||||
blob1 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
||||
blob2 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
||||
blobs=[blob1,blob2]
|
||||
|
||||
# save stream
|
||||
yield self.manager.save_stream(stream_hash, file_name, key, suggested_file_name, blobs)
|
||||
|
||||
out = yield self.manager.get_stream_info(stream_hash)
|
||||
self.assertEqual(key, out[0])
|
||||
self.assertEqual(file_name, out[1])
|
||||
self.assertEqual(suggested_file_name, out[2])
|
||||
|
||||
out = yield self.manager.check_if_stream_exists(stream_hash)
|
||||
self.assertTrue(out)
|
||||
|
||||
out = yield self.manager.get_blobs_for_stream(stream_hash)
|
||||
self.assertEqual(2, len(out))
|
||||
|
||||
out = yield self.manager.get_all_streams()
|
||||
self.assertEqual(1, len(out))
|
||||
|
||||
# add a blob to stream
|
||||
blob3 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
||||
blobs = [blob3]
|
||||
out = yield self.manager.add_blobs_to_stream(stream_hash,blobs)
|
||||
out = yield self.manager.get_blobs_for_stream(stream_hash)
|
||||
self.assertEqual(3, len(out))
|
||||
|
||||
out = yield self.manager.get_stream_of_blob(blob3.blob_hash)
|
||||
self.assertEqual(stream_hash, out)
|
||||
|
||||
# check non existing stream
|
||||
with self.assertRaises(NoSuchStreamHash):
|
||||
out = yield self.manager.get_stream_info(random_lbry_hash())
|
||||
|
||||
# check save of sd blob hash
|
||||
sd_blob_hash = random_lbry_hash()
|
||||
yield self.manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
|
||||
out = yield self.manager.get_sd_blob_hashes_for_stream(stream_hash)
|
||||
self.assertEqual(1, len(out))
|
||||
self.assertEqual(sd_blob_hash,out[0])
|
||||
|
||||
out = yield self.manager.get_stream_hash_for_sd_hash(sd_blob_hash)
|
||||
self.assertEqual(stream_hash, out)
|
||||
|
||||
# delete stream
|
||||
yield self.manager.delete_stream(stream_hash)
|
||||
out = yield self.manager.check_if_stream_exists(stream_hash)
|
||||
self.assertFalse(out)
|
||||
|
||||
|
||||
|
35
tests/unit/lbryfilemanager/test_EncryptedFileManager.py
Normal file
35
tests/unit/lbryfilemanager/test_EncryptedFileManager.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
from twisted.internet import defer
|
||||
from twisted.trial import unittest
|
||||
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
|
||||
from tests.util import random_lbry_hash
|
||||
|
||||
class TestEncryptedFileManager(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_database_operations(self):
|
||||
# test database read/write functions in EncrypteFileManager
|
||||
|
||||
class MocSession(object):
|
||||
pass
|
||||
|
||||
session = MocSession()
|
||||
session.db_dir = '.'
|
||||
stream_info_manager = None
|
||||
sd_identifier = None
|
||||
download_directory = '.'
|
||||
manager = EncryptedFileManager(session, stream_info_manager, sd_identifier, download_directory)
|
||||
yield manager._open_db()
|
||||
out = yield manager._get_all_lbry_files()
|
||||
self.assertEqual(len(out),0)
|
||||
|
||||
stream_hash = random_lbry_hash()
|
||||
blob_data_rate = 0
|
||||
out = yield manager._save_lbry_file(stream_hash, blob_data_rate)
|
||||
rowid = yield manager._get_rowid_for_stream_hash(stream_hash)
|
||||
self.assertEqual(out, rowid)
|
||||
files = yield manager._get_all_lbry_files()
|
||||
self.assertEqual(1, len(files))
|
||||
yield manager._change_file_status(rowid, ManagedEncryptedFileDownloader.STATUS_RUNNING)
|
||||
out = yield manager._get_lbry_file_status(rowid)
|
||||
self.assertEqual(out, ManagedEncryptedFileDownloader.STATUS_RUNNING)
|
Loading…
Add table
Reference in a new issue