forked from LBRYCommunity/lbry-sdk
fix unit tests
This commit is contained in:
parent
2089059792
commit
e888e69d4d
7 changed files with 50 additions and 36 deletions
|
@ -93,10 +93,10 @@ class FileManager:
|
|||
raise
|
||||
log.exception("Unexpected error resolving stream:")
|
||||
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
|
||||
if not resolved_result:
|
||||
raise ResolveError(f"Failed to resolve stream at '{uri}'")
|
||||
if 'error' in resolved_result:
|
||||
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
|
||||
if not resolved_result or uri not in resolved_result:
|
||||
raise ResolveError(f"Failed to resolve stream at '{uri}'")
|
||||
|
||||
txo = resolved_result[uri]
|
||||
claim = txo.claim
|
||||
|
@ -166,11 +166,13 @@ class FileManager:
|
|||
####################
|
||||
|
||||
if not claim.stream.source.bt_infohash:
|
||||
# fixme: this shouldnt be here
|
||||
stream = ManagedStream(
|
||||
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
|
||||
download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
|
||||
analytics_manager=self.analytics_manager
|
||||
)
|
||||
stream.downloader.node = source_manager.node
|
||||
else:
|
||||
stream = None
|
||||
log.info("starting download for %s", uri)
|
||||
|
|
|
@ -92,8 +92,8 @@ class StreamDownloader:
|
|||
|
||||
async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0):
|
||||
# set up peer accumulation
|
||||
if node:
|
||||
self.node = node
|
||||
self.node = node or self.node # fixme: this shouldnt be set here!
|
||||
if self.node:
|
||||
if self.accumulate_task and not self.accumulate_task.done():
|
||||
self.accumulate_task.cancel()
|
||||
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
|
||||
|
|
|
@ -54,6 +54,10 @@ class StreamManager(SourceManager):
|
|||
self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {}
|
||||
self.started = asyncio.Event(loop=self.loop)
|
||||
|
||||
@property
|
||||
def streams(self):
|
||||
return self._sources
|
||||
|
||||
def add(self, source: ManagedStream):
|
||||
super().add(source)
|
||||
self.storage.content_claim_callbacks[source.stream_hash] = lambda: self._update_content_claim(source)
|
||||
|
|
|
@ -16,6 +16,7 @@ class TestComponentManager(AsyncioTestCase):
|
|||
[
|
||||
components.DatabaseComponent,
|
||||
components.ExchangeRateManagerComponent,
|
||||
components.TorrentComponent,
|
||||
components.UPnPComponent
|
||||
],
|
||||
[
|
||||
|
@ -24,9 +25,9 @@ class TestComponentManager(AsyncioTestCase):
|
|||
components.WalletComponent
|
||||
],
|
||||
[
|
||||
components.FileManagerComponent,
|
||||
components.HashAnnouncerComponent,
|
||||
components.PeerProtocolServerComponent,
|
||||
components.FileManagerComponent,
|
||||
components.WalletServerPaymentsComponent
|
||||
]
|
||||
]
|
||||
|
@ -135,8 +136,8 @@ class FakeDelayedBlobManager(FakeComponent):
|
|||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
class FakeDelayedStreamManager(FakeComponent):
|
||||
component_name = "stream_manager"
|
||||
class FakeDelayedFileManager(FakeComponent):
|
||||
component_name = "file_manager"
|
||||
depends_on = [FakeDelayedBlobManager.component_name]
|
||||
|
||||
async def start(self):
|
||||
|
@ -153,7 +154,7 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
|
|||
PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT,
|
||||
EXCHANGE_RATE_MANAGER_COMPONENT],
|
||||
wallet=FakeDelayedWallet,
|
||||
stream_manager=FakeDelayedStreamManager,
|
||||
file_manager=FakeDelayedFileManager,
|
||||
blob_manager=FakeDelayedBlobManager
|
||||
)
|
||||
|
||||
|
@ -163,17 +164,17 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
|
|||
await self.advance(0)
|
||||
self.assertTrue(self.component_manager.get_component('wallet').running)
|
||||
self.assertFalse(self.component_manager.get_component('blob_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('stream_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('file_manager').running)
|
||||
|
||||
await self.advance(1)
|
||||
self.assertTrue(self.component_manager.get_component('wallet').running)
|
||||
self.assertTrue(self.component_manager.get_component('blob_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('stream_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('file_manager').running)
|
||||
|
||||
await self.advance(1)
|
||||
self.assertTrue(self.component_manager.get_component('wallet').running)
|
||||
self.assertTrue(self.component_manager.get_component('blob_manager').running)
|
||||
self.assertTrue(self.component_manager.get_component('stream_manager').running)
|
||||
self.assertTrue(self.component_manager.get_component('file_manager').running)
|
||||
|
||||
async def test_proper_stopping_of_components(self):
|
||||
asyncio.create_task(self.component_manager.start())
|
||||
|
@ -182,18 +183,18 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
|
|||
await self.advance(1)
|
||||
self.assertTrue(self.component_manager.get_component('wallet').running)
|
||||
self.assertTrue(self.component_manager.get_component('blob_manager').running)
|
||||
self.assertTrue(self.component_manager.get_component('stream_manager').running)
|
||||
self.assertTrue(self.component_manager.get_component('file_manager').running)
|
||||
|
||||
asyncio.create_task(self.component_manager.stop())
|
||||
await self.advance(0)
|
||||
self.assertFalse(self.component_manager.get_component('stream_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('file_manager').running)
|
||||
self.assertTrue(self.component_manager.get_component('blob_manager').running)
|
||||
self.assertTrue(self.component_manager.get_component('wallet').running)
|
||||
await self.advance(1)
|
||||
self.assertFalse(self.component_manager.get_component('stream_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('file_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('blob_manager').running)
|
||||
self.assertTrue(self.component_manager.get_component('wallet').running)
|
||||
await self.advance(1)
|
||||
self.assertFalse(self.component_manager.get_component('stream_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('file_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('blob_manager').running)
|
||||
self.assertFalse(self.component_manager.get_component('wallet').running)
|
||||
|
|
|
@ -76,7 +76,7 @@ class TestManagedStream(BlobExchangeTestBase):
|
|||
return q2, self.loop.create_task(_task())
|
||||
|
||||
mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers
|
||||
self.stream.node = mock_node
|
||||
self.stream.downloader.node = mock_node
|
||||
await self.stream.save_file()
|
||||
await self.stream.finished_write_attempt.wait()
|
||||
self.assertTrue(os.path.isfile(self.stream.full_path))
|
||||
|
@ -110,7 +110,6 @@ class TestManagedStream(BlobExchangeTestBase):
|
|||
await self.setup_stream(2)
|
||||
|
||||
mock_node = mock.Mock(spec=Node)
|
||||
q = asyncio.Queue()
|
||||
|
||||
bad_peer = make_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334, allow_localhost=True)
|
||||
|
||||
|
@ -124,7 +123,7 @@ class TestManagedStream(BlobExchangeTestBase):
|
|||
|
||||
mock_node.accumulate_peers = _mock_accumulate_peers
|
||||
|
||||
self.stream.node = mock_node
|
||||
self.stream.downloader.node = mock_node
|
||||
await self.stream.save_file()
|
||||
await self.stream.finished_writing.wait()
|
||||
self.assertTrue(os.path.isfile(self.stream.full_path))
|
||||
|
|
|
@ -39,7 +39,7 @@ class TestStreamAssembler(AsyncioTestCase):
|
|||
with open(file_path, 'wb') as f:
|
||||
f.write(self.cleartext)
|
||||
|
||||
self.stream = await self.stream_manager.create_stream(file_path)
|
||||
self.stream = await self.stream_manager.create(file_path)
|
||||
|
||||
async def _test_reflect_stream(self, response_chunk_size):
|
||||
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size)
|
||||
|
|
|
@ -5,6 +5,8 @@ from unittest import mock
|
|||
import asyncio
|
||||
import json
|
||||
from decimal import Decimal
|
||||
|
||||
from lbry.file.file_manager import FileManager
|
||||
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
|
||||
from lbry.testcase import get_fake_exchange_rate_manager
|
||||
from lbry.utils import generate_id
|
||||
|
@ -110,10 +112,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
|||
|
||||
async def mock_resolve(*args, **kwargs):
|
||||
result = {txo.meta['permanent_url']: txo}
|
||||
claims = [
|
||||
StreamManager._convert_to_old_resolve_output(manager, result)[txo.meta['permanent_url']]
|
||||
]
|
||||
await storage.save_claims(claims)
|
||||
await storage.save_claim_from_output(ledger, txo)
|
||||
return result
|
||||
manager.ledger.resolve = mock_resolve
|
||||
|
||||
|
@ -138,11 +137,20 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
)
|
||||
self.sd_hash = descriptor.sd_hash
|
||||
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
|
||||
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
|
||||
self.client_storage, get_mock_node(self.server_from_client),
|
||||
AnalyticsManager(self.client_config,
|
||||
analytics_manager = AnalyticsManager(
|
||||
self.client_config,
|
||||
binascii.hexlify(generate_id()).decode(),
|
||||
binascii.hexlify(generate_id()).decode()))
|
||||
binascii.hexlify(generate_id()).decode()
|
||||
)
|
||||
self.stream_manager = StreamManager(
|
||||
self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
|
||||
self.client_storage, get_mock_node(self.server_from_client),
|
||||
analytics_manager
|
||||
)
|
||||
self.file_manager = FileManager(
|
||||
self.loop, self.client_config, self.mock_wallet, self.client_storage, analytics_manager
|
||||
)
|
||||
self.file_manager.source_managers['stream'] = self.stream_manager
|
||||
self.exchange_rate_manager = get_fake_exchange_rate_manager()
|
||||
|
||||
async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None):
|
||||
|
@ -159,9 +167,9 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
self.stream_manager.analytics_manager._post = _check_post
|
||||
if error:
|
||||
with self.assertRaises(error):
|
||||
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||
await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
||||
else:
|
||||
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||
await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
||||
await asyncio.sleep(0, loop=self.loop)
|
||||
self.assertTrue(checked_analytics_event)
|
||||
|
||||
|
@ -281,7 +289,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
self.stream_manager.analytics_manager._post = check_post
|
||||
|
||||
self.assertDictEqual(self.stream_manager.streams, {})
|
||||
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
||||
stream_hash = stream.stream_hash
|
||||
self.assertDictEqual(self.stream_manager.streams, {stream.sd_hash: stream})
|
||||
self.assertTrue(stream.running)
|
||||
|
@ -302,7 +310,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
)
|
||||
self.assertEqual(stored_status, "stopped")
|
||||
|
||||
stream.node = self.stream_manager.node
|
||||
stream.downloader.node = self.stream_manager.node
|
||||
await stream.save_file()
|
||||
await stream.finished_writing.wait()
|
||||
await asyncio.sleep(0, loop=self.loop)
|
||||
|
@ -314,7 +322,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
)
|
||||
self.assertEqual(stored_status, "finished")
|
||||
|
||||
await self.stream_manager.delete_stream(stream, True)
|
||||
await self.stream_manager.delete(stream, True)
|
||||
self.assertDictEqual(self.stream_manager.streams, {})
|
||||
self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file")))
|
||||
stored_status = await self.client_storage.run_and_return_one_or_none(
|
||||
|
@ -326,7 +334,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
async def _test_download_error_on_start(self, expected_error, timeout=None):
|
||||
error = None
|
||||
try:
|
||||
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout)
|
||||
await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager, timeout)
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise
|
||||
|
@ -402,7 +410,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
last_blob_hash = json.loads(sdf.read())['blobs'][-2]['blob_hash']
|
||||
self.server_blob_manager.delete_blob(last_blob_hash)
|
||||
self.client_config.blob_download_timeout = 0.1
|
||||
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
||||
await stream.started_writing.wait()
|
||||
self.assertEqual('running', stream.status)
|
||||
self.assertIsNotNone(stream.full_path)
|
||||
|
@ -434,7 +442,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
self.stream_manager.analytics_manager._post = check_post
|
||||
|
||||
self.assertDictEqual(self.stream_manager.streams, {})
|
||||
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
||||
await stream.finished_writing.wait()
|
||||
await asyncio.sleep(0, loop=self.loop)
|
||||
self.stream_manager.stop()
|
||||
|
|
Loading…
Reference in a new issue