forked from LBRYCommunity/lbry-sdk
81 lines
2.8 KiB
Python
81 lines
2.8 KiB
Python
import tempfile
|
|
import shutil
|
|
from twisted.trial import unittest
|
|
from twisted.internet import defer
|
|
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
|
|
from lbrynet.core import utils
|
|
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
|
|
from lbrynet.core.Error import NoSuchStreamHash
|
|
from tests.util import random_lbry_hash
|
|
|
|
class DBEncryptedFileMetadataManagerTest(unittest.TestCase):
|
|
def setUp(self):
|
|
self.db_dir = tempfile.mkdtemp()
|
|
self.manager = DBEncryptedFileMetadataManager(self.db_dir)
|
|
|
|
def tearDown(self):
|
|
self.manager.stop()
|
|
shutil.rmtree(self.db_dir)
|
|
|
|
@defer.inlineCallbacks
|
|
def test_basic(self):
|
|
yield self.manager.setup()
|
|
out = yield self.manager.get_all_streams()
|
|
self.assertEqual(len(out),0)
|
|
|
|
stream_hash = random_lbry_hash()
|
|
file_name = 'file_name'
|
|
key = 'key'
|
|
suggested_file_name = 'sug_file_name'
|
|
blob1 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
|
blob2 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
|
blobs=[blob1,blob2]
|
|
|
|
# save stream
|
|
yield self.manager.save_stream(stream_hash, file_name, key, suggested_file_name, blobs)
|
|
|
|
out = yield self.manager.get_stream_info(stream_hash)
|
|
self.assertEqual(key, out[0])
|
|
self.assertEqual(file_name, out[1])
|
|
self.assertEqual(suggested_file_name, out[2])
|
|
|
|
out = yield self.manager.check_if_stream_exists(stream_hash)
|
|
self.assertTrue(out)
|
|
|
|
out = yield self.manager.get_blobs_for_stream(stream_hash)
|
|
self.assertEqual(2, len(out))
|
|
|
|
out = yield self.manager.get_all_streams()
|
|
self.assertEqual(1, len(out))
|
|
|
|
# add a blob to stream
|
|
blob3 = CryptBlobInfo(random_lbry_hash(),0,10,1)
|
|
blobs = [blob3]
|
|
out = yield self.manager.add_blobs_to_stream(stream_hash,blobs)
|
|
out = yield self.manager.get_blobs_for_stream(stream_hash)
|
|
self.assertEqual(3, len(out))
|
|
|
|
out = yield self.manager.get_stream_of_blob(blob3.blob_hash)
|
|
self.assertEqual(stream_hash, out)
|
|
|
|
# check non existing stream
|
|
with self.assertRaises(NoSuchStreamHash):
|
|
out = yield self.manager.get_stream_info(random_lbry_hash())
|
|
|
|
# check save of sd blob hash
|
|
sd_blob_hash = random_lbry_hash()
|
|
yield self.manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
|
|
out = yield self.manager.get_sd_blob_hashes_for_stream(stream_hash)
|
|
self.assertEqual(1, len(out))
|
|
self.assertEqual(sd_blob_hash,out[0])
|
|
|
|
out = yield self.manager.get_stream_hash_for_sd_hash(sd_blob_hash)
|
|
self.assertEqual(stream_hash, out)
|
|
|
|
# delete stream
|
|
yield self.manager.delete_stream(stream_hash)
|
|
out = yield self.manager.check_if_stream_exists(stream_hash)
|
|
self.assertFalse(out)
|
|
|
|
|
|
|