From e6663603a638cda5d635b8ef35d79c0dafad8754 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 29 Jan 2020 13:49:14 -0300 Subject: [PATCH] fix unit tests --- lbry/file/file_manager.py | 6 ++- lbry/stream/downloader.py | 4 +- lbry/stream/stream_manager.py | 4 ++ .../unit/components/test_component_manager.py | 23 +++++----- tests/unit/stream/test_managed_stream.py | 5 +-- tests/unit/stream/test_reflector.py | 2 +- tests/unit/stream/test_stream_manager.py | 42 +++++++++++-------- 7 files changed, 50 insertions(+), 36 deletions(-) diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 8da9a5619..765cd1b53 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -93,10 +93,10 @@ class FileManager: raise log.exception("Unexpected error resolving stream:") raise ResolveError(f"Unexpected error resolving stream: {str(err)}") - if not resolved_result: - raise ResolveError(f"Failed to resolve stream at '{uri}'") if 'error' in resolved_result: raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}") + if not resolved_result or uri not in resolved_result: + raise ResolveError(f"Failed to resolve stream at '{uri}'") txo = resolved_result[uri] claim = txo.claim @@ -166,11 +166,13 @@ class FileManager: #################### if not claim.stream.source.bt_infohash: + # fixme: this shouldnt be here stream = ManagedStream( self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, analytics_manager=self.analytics_manager ) + stream.downloader.node = source_manager.node else: stream = None log.info("starting download for %s", uri) diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 9fe98ac54..588263b0e 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -92,8 +92,8 @@ class StreamDownloader: async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0): # set up peer accumulation - if node: - self.node = node + self.node = node or self.node # fixme: this shouldnt be set here! + if self.node: if self.accumulate_task and not self.accumulate_task.done(): self.accumulate_task.cancel() _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index da9fc5fac..2743584df 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -54,6 +54,10 @@ class StreamManager(SourceManager): self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.started = asyncio.Event(loop=self.loop) + @property + def streams(self): + return self._sources + def add(self, source: ManagedStream): super().add(source) self.storage.content_claim_callbacks[source.stream_hash] = lambda: self._update_content_claim(source) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index 6738c14e4..b4e81fed7 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -16,6 +16,7 @@ class TestComponentManager(AsyncioTestCase): [ components.DatabaseComponent, components.ExchangeRateManagerComponent, + components.TorrentComponent, components.UPnPComponent ], [ @@ -24,9 +25,9 @@ class TestComponentManager(AsyncioTestCase): components.WalletComponent ], [ + components.FileManagerComponent, components.HashAnnouncerComponent, components.PeerProtocolServerComponent, - components.FileManagerComponent, components.WalletServerPaymentsComponent ] ] @@ -135,8 +136,8 @@ class FakeDelayedBlobManager(FakeComponent): await asyncio.sleep(1) -class FakeDelayedStreamManager(FakeComponent): - component_name = "stream_manager" +class FakeDelayedFileManager(FakeComponent): + component_name = "file_manager" depends_on = [FakeDelayedBlobManager.component_name] async def start(self): @@ -153,7 +154,7 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase): PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT], wallet=FakeDelayedWallet, - stream_manager=FakeDelayedStreamManager, + file_manager=FakeDelayedFileManager, blob_manager=FakeDelayedBlobManager ) @@ -163,17 +164,17 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase): await self.advance(0) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertFalse(self.component_manager.get_component('blob_manager').running) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) await self.advance(1) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) await self.advance(1) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) - self.assertTrue(self.component_manager.get_component('stream_manager').running) + self.assertTrue(self.component_manager.get_component('file_manager').running) async def test_proper_stopping_of_components(self): asyncio.create_task(self.component_manager.start()) @@ -182,18 +183,18 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase): await self.advance(1) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) - self.assertTrue(self.component_manager.get_component('stream_manager').running) + self.assertTrue(self.component_manager.get_component('file_manager').running) asyncio.create_task(self.component_manager.stop()) await self.advance(0) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('wallet').running) await self.advance(1) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) self.assertFalse(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('wallet').running) await self.advance(1) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) self.assertFalse(self.component_manager.get_component('blob_manager').running) self.assertFalse(self.component_manager.get_component('wallet').running) diff --git a/tests/unit/stream/test_managed_stream.py b/tests/unit/stream/test_managed_stream.py index 64e3e3ea2..3542c60e4 100644 --- a/tests/unit/stream/test_managed_stream.py +++ b/tests/unit/stream/test_managed_stream.py @@ -76,7 +76,7 @@ class TestManagedStream(BlobExchangeTestBase): return q2, self.loop.create_task(_task()) mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers - self.stream.node = mock_node + self.stream.downloader.node = mock_node await self.stream.save_file() await self.stream.finished_write_attempt.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) @@ -110,7 +110,6 @@ class TestManagedStream(BlobExchangeTestBase): await self.setup_stream(2) mock_node = mock.Mock(spec=Node) - q = asyncio.Queue() bad_peer = make_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334, allow_localhost=True) @@ -124,7 +123,7 @@ class TestManagedStream(BlobExchangeTestBase): mock_node.accumulate_peers = _mock_accumulate_peers - self.stream.node = mock_node + self.stream.downloader.node = mock_node await self.stream.save_file() await self.stream.finished_writing.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 8c228f92c..c2febd64d 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -39,7 +39,7 @@ class TestStreamAssembler(AsyncioTestCase): with open(file_path, 'wb') as f: f.write(self.cleartext) - self.stream = await self.stream_manager.create_stream(file_path) + self.stream = await self.stream_manager.create(file_path) async def _test_reflect_stream(self, response_chunk_size): reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 4c9bf40fc..5fcb68593 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -5,6 +5,8 @@ from unittest import mock import asyncio import json from decimal import Decimal + +from lbry.file.file_manager import FileManager from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from lbry.testcase import get_fake_exchange_rate_manager from lbry.utils import generate_id @@ -107,10 +109,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): async def mock_resolve(*args): result = {txo.meta['permanent_url']: txo} - claims = [ - StreamManager._convert_to_old_resolve_output(manager, result)[txo.meta['permanent_url']] - ] - await storage.save_claims(claims) + await storage.save_claim_from_output(ledger, txo) return result manager.ledger.resolve = mock_resolve @@ -135,11 +134,20 @@ class TestStreamManager(BlobExchangeTestBase): ) self.sd_hash = descriptor.sd_hash self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) - self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, - self.client_storage, get_mock_node(self.server_from_client), - AnalyticsManager(self.client_config, - binascii.hexlify(generate_id()).decode(), - binascii.hexlify(generate_id()).decode())) + analytics_manager = AnalyticsManager( + self.client_config, + binascii.hexlify(generate_id()).decode(), + binascii.hexlify(generate_id()).decode() + ) + self.stream_manager = StreamManager( + self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, + self.client_storage, get_mock_node(self.server_from_client), + analytics_manager + ) + self.file_manager = FileManager( + self.loop, self.client_config, self.mock_wallet, self.client_storage, analytics_manager + ) + self.file_manager.source_managers['stream'] = self.stream_manager self.exchange_rate_manager = get_fake_exchange_rate_manager() async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None): @@ -156,9 +164,9 @@ class TestStreamManager(BlobExchangeTestBase): self.stream_manager.analytics_manager._post = _check_post if error: with self.assertRaises(error): - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) else: - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await asyncio.sleep(0, loop=self.loop) self.assertTrue(checked_analytics_event) @@ -278,7 +286,7 @@ class TestStreamManager(BlobExchangeTestBase): self.stream_manager.analytics_manager._post = check_post self.assertDictEqual(self.stream_manager.streams, {}) - stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) stream_hash = stream.stream_hash self.assertDictEqual(self.stream_manager.streams, {stream.sd_hash: stream}) self.assertTrue(stream.running) @@ -299,7 +307,7 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "stopped") - stream.node = self.stream_manager.node + stream.downloader.node = self.stream_manager.node await stream.save_file() await stream.finished_writing.wait() await asyncio.sleep(0, loop=self.loop) @@ -311,7 +319,7 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "finished") - await self.stream_manager.delete_stream(stream, True) + await self.stream_manager.delete(stream, True) self.assertDictEqual(self.stream_manager.streams, {}) self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file"))) stored_status = await self.client_storage.run_and_return_one_or_none( @@ -323,7 +331,7 @@ class TestStreamManager(BlobExchangeTestBase): async def _test_download_error_on_start(self, expected_error, timeout=None): error = None try: - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout) + await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager, timeout) except Exception as err: if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise @@ -399,7 +407,7 @@ class TestStreamManager(BlobExchangeTestBase): last_blob_hash = json.loads(sdf.read())['blobs'][-2]['blob_hash'] self.server_blob_manager.delete_blob(last_blob_hash) self.client_config.blob_download_timeout = 0.1 - stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await stream.started_writing.wait() self.assertEqual('running', stream.status) self.assertIsNotNone(stream.full_path) @@ -431,7 +439,7 @@ class TestStreamManager(BlobExchangeTestBase): self.stream_manager.analytics_manager._post = check_post self.assertDictEqual(self.stream_manager.streams, {}) - stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await stream.finished_writing.wait() await asyncio.sleep(0, loop=self.loop) self.stream_manager.stop()