forked from LBRYCommunity/lbry-sdk
Merge pull request #493 from lbryio/filemanager_related_tests
Filemanager related unit tests
This commit is contained in:
commit
da99f3f22b
7 changed files with 158 additions and 5 deletions
|
@ -1,9 +1,10 @@
|
|||
import os
|
||||
import binascii
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer,task
|
||||
from lbrynet.core import log_support, utils
|
||||
|
||||
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer,DHTHashSupplier
|
||||
from lbrynet.core.utils import random_string
|
||||
from lbrynet.core import log_support, utils
|
||||
from tests.util import random_lbry_hash
|
||||
|
||||
class MocDHTNode(object):
|
||||
def __init__(self):
|
||||
|
@ -30,7 +31,7 @@ class DHTHashAnnouncerTest(unittest.TestCase):
|
|||
self.num_blobs = 10
|
||||
self.blobs_to_announce = []
|
||||
for i in range(0, self.num_blobs):
|
||||
self.blobs_to_announce.append(binascii.b2a_hex(os.urandom(32)))
|
||||
self.blobs_to_announce.append(random_lbry_hash())
|
||||
self.clock = task.Clock()
|
||||
self.dht_node = MocDHTNode()
|
||||
utils.call_later = self.clock.callLater
|
||||
|
@ -49,7 +50,7 @@ class DHTHashAnnouncerTest(unittest.TestCase):
|
|||
def test_immediate_announce(self):
|
||||
# Test that immediate announce puts a hash at the front of the queue
|
||||
self.announcer._announce_available_hashes()
|
||||
blob_hash = binascii.b2a_hex(os.urandom(32))
|
||||
blob_hash = random_lbry_hash()
|
||||
self.announcer.immediate_announce([blob_hash])
|
||||
self.assertEqual(self.announcer.hash_queue_size(),self.announcer.CONCURRENT_ANNOUNCERS+1)
|
||||
self.assertEqual(blob_hash, self.announcer.hash_queue[0][0])
|
||||
|
|
0
tests/unit/lbryfile/__init__.py
Normal file
0
tests/unit/lbryfile/__init__.py
Normal file
0
tests/unit/lbryfile/client/__init__.py
Normal file
0
tests/unit/lbryfile/client/__init__.py
Normal file
32
tests/unit/lbryfile/client/test_EncryptedFileDownloader.py
Normal file
32
tests/unit/lbryfile/client/test_EncryptedFileDownloader.py
Normal file
|
@ -0,0 +1,32 @@
|
|||
import os.path
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver
|
||||
|
||||
|
||||
|
||||
class TestEncryptedFileSaver(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_setup_output(self):
|
||||
file_name = 'encrypted_file_saver_test.tmp'
|
||||
self.assertFalse(os.path.isfile(file_name))
|
||||
|
||||
# create file in the temporary trial folder
|
||||
stream_hash = ''
|
||||
peer_finder = None
|
||||
rate_limiter = None
|
||||
blob_manager = None
|
||||
stream_info_manager = None
|
||||
payment_rate_manager = None
|
||||
wallet = None
|
||||
download_directory = '.'
|
||||
upload_allowed = False
|
||||
saver = EncryptedFileSaver(stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet, download_directory, file_name)
|
||||
|
||||
yield saver._setup_output()
|
||||
self.assertTrue(os.path.isfile(file_name))
|
||||
saver._close_output()
|
||||
|
||||
|
80
tests/unit/lbryfile/test_EncryptedFileMetadataManager.py
Normal file
80
tests/unit/lbryfile/test_EncryptedFileMetadataManager.py
Normal file
|
@ -0,0 +1,80 @@
|
|||
import tempfile
|
||||
import shutil
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
|
||||
from lbrynet.core import utils
|
||||
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
|
||||
from lbrynet.core.Error import NoSuchStreamHash
|
||||
from tests.util import random_lbry_hash
|
||||
|
||||
class DBEncryptedFileMetadataManagerTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.db_dir = tempfile.mkdtemp()
|
||||
self.manager = DBEncryptedFileMetadataManager(self.db_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.db_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_basic(self):
|
||||
yield self.manager.setup()
|
||||
out = yield self.manager.get_all_streams()
|
||||
self.assertEqual(len(out),0)
|
||||
|
||||
stream_hash = random_lbry_hash()
|
||||
file_name = 'file_name'
|
||||
key = 'key'
|
||||
suggested_file_name = 'sug_file_name'
|
||||
blob1 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
||||
blob2 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
||||
blobs=[blob1,blob2]
|
||||
|
||||
# save stream
|
||||
yield self.manager.save_stream(stream_hash, file_name, key, suggested_file_name, blobs)
|
||||
|
||||
out = yield self.manager.get_stream_info(stream_hash)
|
||||
self.assertEqual(key, out[0])
|
||||
self.assertEqual(file_name, out[1])
|
||||
self.assertEqual(suggested_file_name, out[2])
|
||||
|
||||
out = yield self.manager.check_if_stream_exists(stream_hash)
|
||||
self.assertTrue(out)
|
||||
|
||||
out = yield self.manager.get_blobs_for_stream(stream_hash)
|
||||
self.assertEqual(2, len(out))
|
||||
|
||||
out = yield self.manager.get_all_streams()
|
||||
self.assertEqual(1, len(out))
|
||||
|
||||
# add a blob to stream
|
||||
blob3 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
||||
blobs = [blob3]
|
||||
out = yield self.manager.add_blobs_to_stream(stream_hash,blobs)
|
||||
out = yield self.manager.get_blobs_for_stream(stream_hash)
|
||||
self.assertEqual(3, len(out))
|
||||
|
||||
out = yield self.manager.get_stream_of_blob(blob3.blob_hash)
|
||||
self.assertEqual(stream_hash, out)
|
||||
|
||||
# check non existing stream
|
||||
with self.assertRaises(NoSuchStreamHash):
|
||||
out = yield self.manager.get_stream_info(random_lbry_hash())
|
||||
|
||||
# check save of sd blob hash
|
||||
sd_blob_hash = random_lbry_hash()
|
||||
yield self.manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
|
||||
out = yield self.manager.get_sd_blob_hashes_for_stream(stream_hash)
|
||||
self.assertEqual(1, len(out))
|
||||
self.assertEqual(sd_blob_hash,out[0])
|
||||
|
||||
out = yield self.manager.get_stream_hash_for_sd_hash(sd_blob_hash)
|
||||
self.assertEqual(stream_hash, out)
|
||||
|
||||
# delete stream
|
||||
yield self.manager.delete_stream(stream_hash)
|
||||
out = yield self.manager.check_if_stream_exists(stream_hash)
|
||||
self.assertFalse(out)
|
||||
|
||||
|
||||
|
35
tests/unit/lbryfilemanager/test_EncryptedFileManager.py
Normal file
35
tests/unit/lbryfilemanager/test_EncryptedFileManager.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
from twisted.internet import defer
|
||||
from twisted.trial import unittest
|
||||
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
|
||||
from tests.util import random_lbry_hash
|
||||
|
||||
class TestEncryptedFileManager(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_database_operations(self):
|
||||
# test database read/write functions in EncrypteFileManager
|
||||
|
||||
class MocSession(object):
|
||||
pass
|
||||
|
||||
session = MocSession()
|
||||
session.db_dir = '.'
|
||||
stream_info_manager = None
|
||||
sd_identifier = None
|
||||
download_directory = '.'
|
||||
manager = EncryptedFileManager(session, stream_info_manager, sd_identifier, download_directory)
|
||||
yield manager._open_db()
|
||||
out = yield manager._get_all_lbry_files()
|
||||
self.assertEqual(len(out),0)
|
||||
|
||||
stream_hash = random_lbry_hash()
|
||||
blob_data_rate = 0
|
||||
out = yield manager._save_lbry_file(stream_hash, blob_data_rate)
|
||||
rowid = yield manager._get_rowid_for_stream_hash(stream_hash)
|
||||
self.assertEqual(out, rowid)
|
||||
files = yield manager._get_all_lbry_files()
|
||||
self.assertEqual(1, len(files))
|
||||
yield manager._change_file_status(rowid, ManagedEncryptedFileDownloader.STATUS_RUNNING)
|
||||
out = yield manager._get_lbry_file_status(rowid)
|
||||
self.assertEqual(out, ManagedEncryptedFileDownloader.STATUS_RUNNING)
|
|
@ -1,5 +1,7 @@
|
|||
import datetime
|
||||
import time
|
||||
import binascii
|
||||
import os
|
||||
|
||||
import mock
|
||||
|
||||
|
@ -8,6 +10,9 @@ DEFAULT_TIMESTAMP = datetime.datetime(2016, 1, 1)
|
|||
DEFAULT_ISO_TIME = time.mktime(DEFAULT_TIMESTAMP.timetuple())
|
||||
|
||||
|
||||
def random_lbry_hash():
|
||||
return binascii.b2a_hex(os.urandom(48))
|
||||
|
||||
def resetTime(test_case, timestamp=DEFAULT_TIMESTAMP):
|
||||
iso_time = time.mktime(timestamp.timetuple())
|
||||
patcher = mock.patch('time.time')
|
||||
|
|
Loading…
Reference in a new issue