lbry-sdk/lbry/tests/unit/database/test_SQLiteStorage.py

261 lines
10 KiB
Python

import shutil
import tempfile
import unittest
import asyncio
import logging
import hashlib
from torba.testcase import AsyncioTestCase
from lbry.conf import Config
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.blob.blob_info import BlobInfo
from lbry.blob.blob_manager import BlobManager
from lbry.stream.descriptor import StreamDescriptor
from tests.test_utils import random_lbry_hash
from lbry.dht.peer import make_kademlia_peer
log = logging.getLogger()
def blob_info_dict(blob_info):
info = {
"length": blob_info.length,
"blob_num": blob_info.blob_num,
"iv": blob_info.iv
}
if blob_info.length:
info['blob_hash'] = blob_info.blob_hash
return info
fake_claim_info = {
'name': "test",
'claim_id': 'deadbeef' * 5,
'address': "bT6wc54qiUUYt34HQF9wnW8b2o2yQTXf2S",
'claim_sequence': 1,
'value': {
"version": "_0_0_1",
"claimType": "streamType",
"stream": {
"source": {
"source": 'deadbeef' * 12,
"version": "_0_0_1",
"contentType": "video/mp4",
"sourceType": "lbry_sd_hash"
},
"version": "_0_0_1",
"metadata": {
"license": "LBRY inc",
"description": "What is LBRY? An introduction with Alex Tabarrok",
"language": "en",
"title": "What is LBRY?",
"author": "Samuel Bryan",
"version": "_0_1_0",
"nsfw": False,
"licenseUrl": "",
"preview": "",
"thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png"
}
}
},
'height': 10000,
'amount': '1.0',
'effective_amount': '1.0',
'nout': 0,
'txid': "deadbeef" * 8,
'supports': [],
'channel_claim_id': None,
'channel_name': None
}
class StorageTest(AsyncioTestCase):
async def asyncSetUp(self):
self.conf = Config()
self.storage = SQLiteStorage(self.conf, ':memory:')
self.blob_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.blob_dir)
self.blob_manager = BlobManager(asyncio.get_event_loop(), self.blob_dir, self.storage, self.conf)
await self.storage.open()
async def asyncTearDown(self):
await self.storage.close()
async def store_fake_blob(self, blob_hash, length=100):
await self.storage.add_blobs((blob_hash, length), finished=True)
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())]
descriptor = StreamDescriptor(
asyncio.get_event_loop(), self.blob_dir, file_name, key, file_name, blobs, stream_hash
)
sd_blob = await descriptor.make_sd_blob()
await self.storage.store_stream(sd_blob, descriptor)
return descriptor
async def make_and_store_fake_stream(self, blob_count=2, stream_hash=None):
stream_hash = stream_hash or random_lbry_hash()
blobs = [
BlobInfo(i + 1, 100, "DEADBEEF", random_lbry_hash())
for i in range(blob_count)
]
await self.store_fake_stream(stream_hash, blobs)
class TestSQLiteStorage(StorageTest):
async def test_setup(self):
files = await self.storage.get_all_lbry_files()
self.assertEqual(len(files), 0)
blobs = await self.storage.get_all_blob_hashes()
self.assertEqual(len(blobs), 0)
async def test_store_blob(self):
blob_hash = random_lbry_hash()
await self.store_fake_blob(blob_hash)
blob_hashes = await self.storage.get_all_blob_hashes()
self.assertEqual(blob_hashes, [blob_hash])
async def test_delete_blob(self):
blob_hash = random_lbry_hash()
await self.store_fake_blob(blob_hash)
blob_hashes = await self.storage.get_all_blob_hashes()
self.assertEqual(blob_hashes, [blob_hash])
await self.storage.delete_blobs_from_db(blob_hashes)
blob_hashes = await self.storage.get_all_blob_hashes()
self.assertEqual(blob_hashes, [])
async def test_supports_storage(self):
claim_ids = [random_lbry_hash() for _ in range(10)]
random_supports = [{
"txid": random_lbry_hash(),
"nout": i,
"address": f"addr{i}",
"amount": f"{i}.0"
} for i in range(20)]
expected_supports = {}
for idx, claim_id in enumerate(claim_ids):
await self.storage.save_supports({claim_id: random_supports[idx*2:idx*2+2]})
for random_support in random_supports[idx*2:idx*2+2]:
random_support['claim_id'] = claim_id
expected_supports.setdefault(claim_id, []).append(random_support)
supports = await self.storage.get_supports(claim_ids[0])
self.assertEqual(supports, expected_supports[claim_ids[0]])
all_supports = await self.storage.get_supports(*claim_ids)
for support in all_supports:
self.assertIn(support, expected_supports[support['claim_id']])
class StreamStorageTests(StorageTest):
async def test_store_and_delete_stream(self):
stream_hash = random_lbry_hash()
descriptor = await self.store_fake_stream(stream_hash)
files = await self.storage.get_all_lbry_files()
self.assertListEqual(files, [])
stream_hashes = await self.storage.get_all_stream_hashes()
self.assertListEqual(stream_hashes, [stream_hash])
await self.storage.delete_stream(descriptor)
files = await self.storage.get_all_lbry_files()
self.assertListEqual(files, [])
stream_hashes = await self.storage.get_all_stream_hashes()
self.assertListEqual(stream_hashes, [])
@unittest.SkipTest
class FileStorageTests(StorageTest):
async def test_store_file(self):
download_directory = self.db_dir
out = await self.storage.get_all_lbry_files()
self.assertEqual(len(out), 0)
stream_hash = random_lbry_hash()
sd_hash = random_lbry_hash()
blob1 = random_lbry_hash()
blob2 = random_lbry_hash()
await self.store_fake_blob(sd_hash)
await self.store_fake_blob(blob1)
await self.store_fake_blob(blob2)
await self.store_fake_stream(stream_hash, sd_hash)
await self.store_fake_stream_blob(stream_hash, blob1, 1)
await self.store_fake_stream_blob(stream_hash, blob2, 2)
blob_data_rate = 0
file_name = "test file"
await self.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate
)
files = await self.storage.get_all_lbry_files()
self.assertEqual(1, len(files))
@unittest.SkipTest
class ContentClaimStorageTests(StorageTest):
async def test_store_content_claim(self):
download_directory = self.db_dir
out = await self.storage.get_all_lbry_files()
self.assertEqual(len(out), 0)
stream_hash = random_lbry_hash()
sd_hash = fake_claim_info['value']['stream']['source']['source']
# test that we can associate a content claim to a file
# use the generated sd hash in the fake claim
fake_outpoint = "%s:%i" % (fake_claim_info['txid'], fake_claim_info['nout'])
await self.make_and_store_fake_stream(blob_count=2, stream_hash=stream_hash, sd_hash=sd_hash)
blob_data_rate = 0
file_name = "test file"
await self.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate
)
await self.storage.save_claims([fake_claim_info])
await self.storage.save_content_claim(stream_hash, fake_outpoint)
stored_content_claim = await self.storage.get_content_claim(stream_hash)
self.assertDictEqual(stored_content_claim, fake_claim_info)
stream_hashes = await self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'],
stream_hash)
self.assertListEqual(stream_hashes, [])
# test that we can't associate a claim update with a new stream to the file
second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash()
await self.make_and_store_fake_stream(blob_count=2, stream_hash=second_stream_hash, sd_hash=second_sd_hash)
with self.assertRaisesRegex(Exception, "stream mismatch"):
await self.storage.save_content_claim(second_stream_hash, fake_outpoint)
# test that we can associate a new claim update containing the same stream to the file
update_info = deepcopy(fake_claim_info)
update_info['txid'] = "beef0000" * 12
update_info['nout'] = 0
second_outpoint = "%s:%i" % (update_info['txid'], update_info['nout'])
await self.storage.save_claims([update_info])
await self.storage.save_content_claim(stream_hash, second_outpoint)
update_info_result = await self.storage.get_content_claim(stream_hash)
self.assertDictEqual(update_info_result, update_info)
# test that we can't associate an update with a mismatching claim id
invalid_update_info = deepcopy(fake_claim_info)
invalid_update_info['txid'] = "beef0001" * 12
invalid_update_info['nout'] = 0
invalid_update_info['claim_id'] = "beef0002" * 5
invalid_update_outpoint = "%s:%i" % (invalid_update_info['txid'], invalid_update_info['nout'])
with self.assertRaisesRegex(Exception, "mismatching claim ids when updating stream "
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeef "
"vs beef0002beef0002beef0002beef0002beef0002"):
await self.storage.save_claims([invalid_update_info])
await self.storage.save_content_claim(stream_hash, invalid_update_outpoint)
current_claim_info = await self.storage.get_content_claim(stream_hash)
# this should still be the previous update
self.assertDictEqual(current_claim_info, update_info)
class UpdatePeersTest(StorageTest):
async def test_update_get_peers(self):
node_id = hashlib.sha384("1234".encode()).digest()
args = (node_id, '73.186.148.72', 4444, None)
fake_peer = make_kademlia_peer(*args)
await self.storage.save_kademlia_peers([fake_peer])
peers = await self.storage.get_persisted_kademlia_peers()
self.assertTupleEqual(args, peers[0])