import os import hashlib import aiohttp import aiohttp.web import asyncio from lbry.utils import aiohttp_request from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.testcase import CommandTestCase def get_random_bytes(n: int) -> bytes: result = b''.join(hashlib.sha256(os.urandom(4)).digest() for _ in range(n // 16)) if len(result) < n: result += os.urandom(n - len(result)) elif len(result) > n: result = result[:-(len(result) - n)] assert len(result) == n, (n, len(result)) return result class RangeRequests(CommandTestCase): async def _restart_stream_manager(self): self.daemon.file_manager.stop() await self.daemon.file_manager.start() return async def _setup_stream(self, data: bytes, save_blobs: bool = True, save_files: bool = False, file_size=0): self.daemon.conf.save_blobs = save_blobs self.daemon.conf.save_files = save_files self.data = data await self.stream_create('foo', '0.01', data=self.data, file_size=file_size) if save_blobs: self.assertGreater(len(os.listdir(self.daemon.blob_manager.blob_dir)), 1) await (await self.daemon.jsonrpc_file_list())['items'][0].fully_reflected.wait() await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo') self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir))) # await self._restart_stream_manager() await self.daemon.streaming_runner.setup() site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, self.daemon.conf.streaming_port) await site.start() self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) async def _test_range_requests(self): name = 'foo' url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/{name}' async with aiohttp_request('get', url) as req: self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream') content_range = req.headers.get('Content-Range') content_length = int(req.headers.get('Content-Length')) streamed_bytes = await req.content.read() self.assertEqual(content_length, len(streamed_bytes)) return streamed_bytes, content_range, content_length async def test_range_requests_2_byte(self): self.data = b'hi' await self._setup_stream(self.data) streamed, content_range, content_length = await self._test_range_requests() self.assertEqual(15, content_length) self.assertEqual(b'hi\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', streamed) self.assertEqual('bytes 0-14/15', content_range) async def test_range_requests_15_byte(self): self.data = b'123456789abcdef' await self._setup_stream(self.data) streamed, content_range, content_length = await self._test_range_requests() self.assertEqual(15, content_length) self.assertEqual(15, len(streamed)) self.assertEqual(self.data, streamed) self.assertEqual('bytes 0-14/15', content_range) async def test_range_requests_0_padded_bytes(self, size: int = (MAX_BLOB_SIZE - 1) * 4, expected_range: str = 'bytes 0-8388603/8388604', padding=b'', file_size=0): self.data = get_random_bytes(size) await self._setup_stream(self.data, file_size=file_size) streamed, content_range, content_length = await self._test_range_requests() self.assertEqual(len(self.data + padding), content_length) self.assertEqual(streamed, self.data + padding) self.assertEqual(expected_range, content_range) async def test_range_requests_1_padded_bytes(self): await self.test_range_requests_0_padded_bytes( ((MAX_BLOB_SIZE - 1) * 4) - 1, padding=b'\x00' ) async def test_range_requests_2_padded_bytes(self): await self.test_range_requests_0_padded_bytes( ((MAX_BLOB_SIZE - 1) * 4) - 2, padding=b'\x00' * 2 ) async def test_range_requests_14_padded_bytes(self): await self.test_range_requests_0_padded_bytes( ((MAX_BLOB_SIZE - 1) * 4) - 14, padding=b'\x00' * 14 ) async def test_range_requests_no_padding_size_from_claim(self): size = ((MAX_BLOB_SIZE - 1) * 4) - 14 await self.test_range_requests_0_padded_bytes(size, padding=b'', file_size=size, expected_range=f"bytes 0-{size-1}/{size}") async def test_range_requests_15_padded_bytes(self): await self.test_range_requests_0_padded_bytes( ((MAX_BLOB_SIZE - 1) * 4) - 15, padding=b'\x00' * 15 ) async def test_forbidden(self): self.data = get_random_bytes(1000) await self._setup_stream(self.data, file_size=1000) url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/foo' self.daemon.conf.streaming_get = False async with aiohttp_request('get', url) as req: self.assertEqual(403, req.status) async def test_range_requests_last_block_of_last_blob_padding(self): self.data = get_random_bytes(((MAX_BLOB_SIZE - 1) * 4) - 16) await self._setup_stream(self.data) streamed, content_range, content_length = await self._test_range_requests() self.assertEqual(len(self.data), content_length) self.assertEqual(streamed, self.data) self.assertEqual('bytes 0-8388587/8388588', content_range) async def test_streaming_only_with_blobs(self): self.data = get_random_bytes((MAX_BLOB_SIZE - 1) * 4) await self._setup_stream(self.data) await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) # test that repeated range requests do not create duplicate files for _ in range(3): await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) current_files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) # test that a range request after restart does not create a duplicate file await self._restart_stream_manager() current_files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) current_files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) async def test_streaming_only_without_blobs(self): self.data = get_random_bytes((MAX_BLOB_SIZE - 1) * 4) await self._setup_stream(self.data, save_blobs=False) await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) # test that repeated range requests do not create duplicate files for _ in range(3): await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) current_files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) # test that a range request after restart does not create a duplicate file await self._restart_stream_manager() current_files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNone(stream.download_directory) self.assertIsNone(stream.full_path) current_files_in_download_dir = list(os.scandir(self.daemon.conf.data_dir)) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) async def test_stream_and_save_file_with_blobs(self): self.data = get_random_bytes((MAX_BLOB_SIZE - 1) * 4) await self._setup_stream(self.data, save_files=True) await self._test_range_requests() streams = (await self.daemon.jsonrpc_file_list())['items'] self.assertEqual(1, len(streams)) stream = streams[0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) full_path = stream.full_path files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) for _ in range(3): await self._test_range_requests() streams = (await self.daemon.jsonrpc_file_list())['items'] self.assertEqual(1, len(streams)) stream = streams[0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) current_files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) await self._restart_stream_manager() current_files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) streams = (await self.daemon.jsonrpc_file_list())['items'] self.assertEqual(1, len(streams)) stream = streams[0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) await self._test_range_requests() streams = (await self.daemon.jsonrpc_file_list())['items'] self.assertEqual(1, len(streams)) stream = streams[0] self.assertTrue(os.path.isfile(self.daemon.blob_manager.get_blob(stream.sd_hash).file_path)) self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) current_files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) with open(stream.full_path, 'rb') as f: self.assertEqual(self.data, f.read()) async def test_stream_and_save_file_without_blobs(self): self.data = get_random_bytes((MAX_BLOB_SIZE - 1) * 4) await self._setup_stream(self.data, save_files=True) self.daemon.conf.save_blobs = False await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) full_path = stream.full_path files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) for _ in range(3): await self._test_range_requests() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) current_files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) await self._restart_stream_manager() current_files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) streams = (await self.daemon.jsonrpc_file_list())['items'] self.assertEqual(1, len(streams)) stream = streams[0] self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) await self._test_range_requests() streams = (await self.daemon.jsonrpc_file_list())['items'] self.assertEqual(1, len(streams)) stream = streams[0] self.assertTrue(os.path.isdir(stream.download_directory)) self.assertTrue(os.path.isfile(stream.full_path)) current_files_in_download_dir = list(os.scandir(os.path.dirname(full_path))) self.assertEqual( len(files_in_download_dir), len(current_files_in_download_dir) ) with open(stream.full_path, 'rb') as f: self.assertEqual(self.data, f.read()) async def test_switch_save_blobs_while_running(self): await self.test_streaming_only_without_blobs() self.daemon.conf.save_blobs = True blobs_in_stream = (await self.daemon.jsonrpc_file_list())['items'][0].blobs_in_stream sd_hash = (await self.daemon.jsonrpc_file_list())['items'][0].sd_hash start_file_count = len(os.listdir(self.daemon.blob_manager.blob_dir)) await self._test_range_requests() self.assertEqual(start_file_count + blobs_in_stream, len(os.listdir(self.daemon.blob_manager.blob_dir))) self.assertEqual(0, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining) # switch back self.daemon.conf.save_blobs = False await self._test_range_requests() self.assertEqual(start_file_count + blobs_in_stream, len(os.listdir(self.daemon.blob_manager.blob_dir))) self.assertEqual(0, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining) await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, sd_hash=sd_hash) self.assertEqual(start_file_count, len(os.listdir(self.daemon.blob_manager.blob_dir))) await self._test_range_requests() self.assertEqual(start_file_count, len(os.listdir(self.daemon.blob_manager.blob_dir))) self.assertEqual(blobs_in_stream, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining) async def test_file_save_streaming_only_save_blobs(self): await self.test_streaming_only_with_blobs() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNone(stream.full_path) self.server.stop_server() await self.daemon.jsonrpc_file_save('test', self.daemon.conf.data_dir) stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNotNone(stream.full_path) await stream.finished_writing.wait() with open(stream.full_path, 'rb') as f: self.assertEqual(self.data, f.read()) await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, sd_hash=stream.sd_hash) async def test_file_save_stop_before_finished_streaming_only(self, wait_for_start_writing=False): await self.test_streaming_only_with_blobs() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNone(stream.full_path) self.server.stop_server() await self.daemon.jsonrpc_file_save('test', self.daemon.conf.data_dir) stream = (await self.daemon.jsonrpc_file_list())['items'][0] path = stream.full_path self.assertIsNotNone(path) if wait_for_start_writing: await stream.started_writing.wait() self.assertTrue(os.path.isfile(path)) await self._restart_stream_manager() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNotNone(stream.full_path) self.assertFalse(os.path.isfile(path)) if wait_for_start_writing: await stream.started_writing.wait() self.assertTrue(os.path.isfile(path)) async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self): return await self.test_file_save_stop_before_finished_streaming_only(wait_for_start_writing=True) async def test_file_save_streaming_only_dont_save_blobs(self): await self.test_streaming_only_without_blobs() stream = (await self.daemon.jsonrpc_file_list())['items'][0] self.assertIsNone(stream.full_path) await self.daemon.jsonrpc_file_save('test', self.daemon.conf.data_dir) stream = (await self.daemon.jsonrpc_file_list())['items'][0] await stream.finished_writing.wait() with open(stream.full_path, 'rb') as f: self.assertEqual(self.data, f.read()) class RangeRequestsLRUCache(CommandTestCase): blob_lru_cache_size = 32 async def _request_stream(self): name = 'foo' url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/{name}' async with aiohttp_request('get', url) as req: self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream') content_range = req.headers.get('Content-Range') content_length = int(req.headers.get('Content-Length')) streamed_bytes = await req.content.read() self.assertEqual(content_length, len(streamed_bytes)) self.assertEqual(15, content_length) self.assertEqual(b'hi\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', streamed_bytes) self.assertEqual('bytes 0-14/15', content_range) async def test_range_requests_with_blob_lru_cache(self): self.data = b'hi' self.daemon.conf.save_blobs = False self.daemon.conf.save_files = False await self.stream_create('foo', '0.01', data=self.data, file_size=0) await (await self.daemon.jsonrpc_file_list())['items'][0].fully_reflected.wait() await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo') self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir))) await self.daemon.streaming_runner.setup() site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, self.daemon.conf.streaming_port) await site.start() self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) await self._request_stream() self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.server.stop_server() # running with cache size 0 gets through without errors without # this since the server doesn't stop immediately await asyncio.sleep(1) await self._request_stream()