lbry-sdk/tests/unit/stream/test_downloader.py

80 lines
3.2 KiB
Python
Raw Normal View History

2019-01-22 18:54:17 +01:00
import os
2019-01-30 23:43:02 +01:00
import unittest
2019-01-24 00:24:43 +01:00
from unittest import mock
2019-01-22 18:54:17 +01:00
import asyncio
2019-01-30 23:43:02 +01:00
from lbrynet.conf import Config
2019-01-22 18:54:17 +01:00
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.downloader import StreamDownloader
from lbrynet.dht.node import Node
from lbrynet.dht.peer import KademliaPeer
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
class TestStreamDownloader(BlobExchangeTestBase):
async def setup_stream(self, blob_count: int = 10):
self.stream_bytes = b''
for _ in range(blob_count):
self.stream_bytes += os.urandom((MAX_BLOB_SIZE - 1))
# create the stream
file_path = os.path.join(self.server_dir, "test_file")
with open(file_path, 'wb') as f:
f.write(self.stream_bytes)
descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path)
self.sd_hash = descriptor.calculate_sd_hash()
2019-01-30 23:43:02 +01:00
conf = Config(data_dir=self.server_dir, wallet_dir=self.server_dir, download_dir=self.server_dir,
reflector_servers=[])
self.downloader = StreamDownloader(self.loop, conf, self.client_blob_manager, self.sd_hash)
2019-01-22 18:54:17 +01:00
2019-01-30 23:43:02 +01:00
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None):
2019-01-22 18:54:17 +01:00
await self.setup_stream(blob_count)
mock_node = mock.Mock(spec=Node)
2019-01-30 23:43:02 +01:00
def _mock_accumulate_peers(q1, q2):
async def _task():
pass
q2.put_nowait([self.server_from_client])
return q2, self.loop.create_task(_task())
2019-01-22 18:54:17 +01:00
2019-01-30 23:43:02 +01:00
mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers
2019-01-22 18:54:17 +01:00
self.downloader.download(mock_node)
await self.downloader.stream_finished_event.wait()
await self.downloader.stop()
self.assertTrue(os.path.isfile(self.downloader.output_path))
with open(self.downloader.output_path, 'rb') as f:
self.assertEqual(f.read(), self.stream_bytes)
async def test_transfer_stream(self):
await self._test_transfer_stream(10)
2019-01-30 23:43:02 +01:00
@unittest.SkipTest
async def test_transfer_hundred_blob_stream(self):
await self._test_transfer_stream(100)
2019-01-22 18:54:17 +01:00
async def test_transfer_stream_bad_first_peer_good_second(self):
await self.setup_stream(2)
mock_node = mock.Mock(spec=Node)
2019-01-30 23:43:02 +01:00
q = asyncio.Queue()
2019-01-22 18:54:17 +01:00
bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334)
2019-01-30 23:43:02 +01:00
def _mock_accumulate_peers(q1, q2):
async def _task():
pass
2019-01-22 18:54:17 +01:00
2019-01-30 23:43:02 +01:00
q2.put_nowait([bad_peer])
self.loop.call_later(1, q2.put_nowait, [self.server_from_client])
return q2, self.loop.create_task(_task())
2019-01-22 18:54:17 +01:00
2019-01-30 23:43:02 +01:00
mock_node.accumulate_peers = _mock_accumulate_peers
2019-01-22 18:54:17 +01:00
self.downloader.download(mock_node)
await self.downloader.stream_finished_event.wait()
self.assertTrue(os.path.isfile(self.downloader.output_path))
with open(self.downloader.output_path, 'rb') as f:
self.assertEqual(f.read(), self.stream_bytes)
# self.assertIs(self.server_from_client.tcp_last_down, None)
# self.assertIsNot(bad_peer.tcp_last_down, None)