import tempfile import shutil from twisted.trial import unittest from twisted.internet import defer from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.core.Error import NoSuchStreamHash from lbrynet.tests.util import random_lbry_hash class DBEncryptedFileMetadataManagerTest(unittest.TestCase): def setUp(self): self.db_dir = tempfile.mkdtemp() self.manager = DBEncryptedFileMetadataManager(self.db_dir) def tearDown(self): self.manager.stop() shutil.rmtree(self.db_dir) @defer.inlineCallbacks def test_basic(self): yield self.manager.setup() out = yield self.manager.get_all_streams() self.assertEqual(len(out), 0) stream_hash = random_lbry_hash() file_name = 'file_name' key = 'key' suggested_file_name = 'sug_file_name' blob1 = CryptBlobInfo(random_lbry_hash(), 0, 10, 1) blob2 = CryptBlobInfo(random_lbry_hash(), 0, 10, 1) blobs = [blob1, blob2] # save stream yield self.manager.save_stream(stream_hash, file_name, key, suggested_file_name, blobs) out = yield self.manager.get_stream_info(stream_hash) self.assertEqual(key, out[0]) self.assertEqual(file_name, out[1]) self.assertEqual(suggested_file_name, out[2]) out = yield self.manager.check_if_stream_exists(stream_hash) self.assertTrue(out) out = yield self.manager.get_blobs_for_stream(stream_hash) self.assertEqual(2, len(out)) out = yield self.manager.get_all_streams() self.assertEqual(1, len(out)) # add a blob to stream blob3 = CryptBlobInfo(random_lbry_hash(), 0, 10, 1) blobs = [blob3] out = yield self.manager.add_blobs_to_stream(stream_hash, blobs) out = yield self.manager.get_blobs_for_stream(stream_hash) self.assertEqual(3, len(out)) out = yield self.manager.get_stream_of_blob(blob3.blob_hash) self.assertEqual(stream_hash, out) # check non existing stream with self.assertRaises(NoSuchStreamHash): out = yield self.manager.get_stream_info(random_lbry_hash()) # check save of sd blob hash sd_blob_hash = random_lbry_hash() yield self.manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) out = yield self.manager.get_sd_blob_hashes_for_stream(stream_hash) self.assertEqual(1, len(out)) self.assertEqual(sd_blob_hash, out[0]) out = yield self.manager.get_stream_hash_for_sd_hash(sd_blob_hash) self.assertEqual(stream_hash, out) # delete stream yield self.manager.delete_stream(stream_hash) out = yield self.manager.check_if_stream_exists(stream_hash) self.assertFalse(out)