import os import shutil import unittest from unittest import mock import asyncio from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob_exchange.serialization import BlobResponse from lbry.blob_exchange.server import BlobServerProtocol from lbry.dht.node import Node from lbry.dht.peer import make_kademlia_peer from lbry.stream.managed_stream import ManagedStream from lbry.stream.descriptor import StreamDescriptor from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase def get_mock_node(loop): mock_node = mock.Mock(spec=Node) mock_node.joined = asyncio.Event(loop=loop) mock_node.joined.set() return mock_node class TestManagedStream(BlobExchangeTestBase): async def create_stream(self, blob_count: int = 10, file_name='test_file'): self.stream_bytes = b'' for _ in range(blob_count): self.stream_bytes += os.urandom(MAX_BLOB_SIZE - 1) # create the stream file_path = os.path.join(self.server_dir, file_name) with open(file_path, 'wb') as f: f.write(self.stream_bytes) descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path) self.sd_hash = descriptor.calculate_sd_hash() return descriptor async def setup_stream(self, blob_count: int = 10): await self.create_stream(blob_count) self.stream = ManagedStream( self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir ) async def test_client_sanitizes_file_name(self): illegal_name = 't