import os import asyncio import tempfile import shutil import json from torba.testcase import AsyncioTestCase from lbrynet.conf import Config from lbrynet.error import InvalidStreamDescriptorError from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.stream.descriptor import StreamDescriptor class TestStreamDescriptor(AsyncioTestCase): async def asyncSetUp(self): self.loop = asyncio.get_event_loop() self.key = b'deadbeef' * 4 self.cleartext = os.urandom(20000000) self.tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(self.tmp_dir)) self.storage = SQLiteStorage(Config(), ":memory:") await self.storage.open() self.blob_manager = BlobFileManager(self.loop, self.tmp_dir, self.storage) self.file_path = os.path.join(self.tmp_dir, "test_file") with open(self.file_path, 'wb') as f: f.write(self.cleartext) self.descriptor = await StreamDescriptor.create_stream(self.loop, self.tmp_dir, self.file_path, key=self.key) self.sd_hash = self.descriptor.calculate_sd_hash() self.sd_dict = json.loads(self.descriptor.as_json()) def _write_sd(self): with open(os.path.join(self.tmp_dir, self.sd_hash), 'wb') as f: f.write(json.dumps(self.sd_dict, sort_keys=True).encode()) async def _test_invalid_sd(self): self._write_sd() with self.assertRaises(InvalidStreamDescriptorError): await self.blob_manager.get_stream_descriptor(self.sd_hash) async def test_load_sd_blob(self): self._write_sd() descriptor = await self.blob_manager.get_stream_descriptor(self.sd_hash) self.assertEqual(descriptor.calculate_sd_hash(), self.sd_hash) async def test_missing_terminator(self): self.sd_dict['blobs'].pop() await self._test_invalid_sd() async def test_terminator_not_at_end(self): terminator = self.sd_dict['blobs'].pop() self.sd_dict['blobs'] = [terminator] + self.sd_dict['blobs'] await self._test_invalid_sd() async def test_terminator_has_blob_hash(self): self.sd_dict['blobs'][-1]['blob_hash'] = '1' * 96 await self._test_invalid_sd() async def test_blob_order(self): terminator = self.sd_dict['blobs'].pop() self.sd_dict['blobs'].reverse() self.sd_dict['blobs'].append(terminator) await self._test_invalid_sd() async def test_skip_blobs(self): self.sd_dict['blobs'][-2]['blob_num'] = self.sd_dict['blobs'][-2]['blob_num'] + 1 await self._test_invalid_sd() async def test_invalid_stream_hash(self): self.sd_dict['blobs'][-2]['blob_hash'] = '1' * 96 await self._test_invalid_sd() async def test_zero_length_blob(self): self.sd_dict['blobs'][-2]['length'] = 0 await self._test_invalid_sd()