This commit is contained in:
Jack Robison 2019-01-30 17:43:02 -05:00
parent ca5c638124
commit 6ad68eabd9
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 33 additions and 32 deletions

View file

@ -81,9 +81,9 @@ class StreamDownloader(StreamAssembler):
KademliaPeer(self.loop, address=(await resolve_host(self.loop, url)), tcp_port=port + 1)
for url, port in self.config.reflector_servers
])
self.fixed_peers_handle = self.loop.call_later(self.config.fixed_peer_delay, self.loop.create_task,
_add_fixed_peers())
if self.config.reflector_servers:
self.fixed_peers_handle = self.loop.call_later(self.config.fixed_peer_delay, self.loop.create_task,
_add_fixed_peers())
def download(self, node: typing.Optional['Node'] = None):
self.node = node

View file

@ -32,12 +32,15 @@ class BlobExchangeTestBase(AsyncioTestCase):
self.server_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.client_dir)
self.addCleanup(shutil.rmtree, self.server_dir)
self.server_storage = SQLiteStorage(Config(), os.path.join(self.server_dir, "lbrynet.sqlite"))
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
reflector_servers=[])
self.server_storage = SQLiteStorage(self.server_config, os.path.join(self.server_dir, "lbrynet.sqlite"))
self.server_blob_manager = BlobFileManager(self.loop, self.server_dir, self.server_storage)
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
self.client_storage = SQLiteStorage(Config(), os.path.join(self.client_dir, "lbrynet.sqlite"))
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
reflector_servers=[])
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
self.client_blob_manager = BlobFileManager(self.loop, self.client_dir, self.client_storage)
self.client_peer_manager = PeerManager(self.loop)
self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)

View file

@ -7,8 +7,8 @@ from lbrynet.conf import Config
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.assembler import StreamAssembler
from lbrynet.stream.descriptor import StreamDescriptor
class TestStreamAssembler(AsyncioTestCase):

View file

@ -1,7 +1,8 @@
import os
import unittest
from unittest import mock
import asyncio
import contextlib
from lbrynet.conf import Config
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.downloader import StreamDownloader
from lbrynet.dht.node import Node
@ -21,22 +22,21 @@ class TestStreamDownloader(BlobExchangeTestBase):
f.write(self.stream_bytes)
descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path)
self.sd_hash = descriptor.calculate_sd_hash()
self.downloader = StreamDownloader(self.loop, self.client_blob_manager, self.sd_hash, 3, 3, self.client_dir)
conf = Config(data_dir=self.server_dir, wallet_dir=self.server_dir, download_dir=self.server_dir,
reflector_servers=[])
self.downloader = StreamDownloader(self.loop, conf, self.client_blob_manager, self.sd_hash)
async def _test_transfer_stream(self, blob_count: int, mock_peer_search=None):
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None):
await self.setup_stream(blob_count)
mock_node = mock.Mock(spec=Node)
@contextlib.asynccontextmanager
async def _mock_peer_search(*_):
async def _gen():
yield [self.server_from_client]
return
def _mock_accumulate_peers(q1, q2):
async def _task():
pass
q2.put_nowait([self.server_from_client])
return q2, self.loop.create_task(_task())
yield _gen()
mock_node.stream_peer_search_junction = mock_peer_search or _mock_peer_search
mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers
self.downloader.download(mock_node)
await self.downloader.stream_finished_event.wait()
@ -48,32 +48,30 @@ class TestStreamDownloader(BlobExchangeTestBase):
async def test_transfer_stream(self):
await self._test_transfer_stream(10)
# async def test_transfer_hundred_blob_stream(self):
# await self._test_transfer_stream(100)
@unittest.SkipTest
async def test_transfer_hundred_blob_stream(self):
await self._test_transfer_stream(100)
async def test_transfer_stream_bad_first_peer_good_second(self):
await self.setup_stream(2)
mock_node = mock.Mock(spec=Node)
q = asyncio.Queue()
bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334)
@contextlib.asynccontextmanager
async def mock_peer_search(*_):
async def _gen():
await asyncio.sleep(0.05, loop=self.loop)
yield [bad_peer]
await asyncio.sleep(0.1, loop=self.loop)
yield [self.server_from_client]
return
def _mock_accumulate_peers(q1, q2):
async def _task():
pass
yield _gen()
q2.put_nowait([bad_peer])
self.loop.call_later(1, q2.put_nowait, [self.server_from_client])
return q2, self.loop.create_task(_task())
mock_node.stream_peer_search_junction = mock_peer_search
mock_node.accumulate_peers = _mock_accumulate_peers
self.downloader.download(mock_node)
await self.downloader.stream_finished_event.wait()
await self.downloader.stop()
self.assertTrue(os.path.isfile(self.downloader.output_path))
with open(self.downloader.output_path, 'rb') as f:
self.assertEqual(f.read(), self.stream_bytes)