diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 37fc056b3..a0de756ef 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -56,7 +56,6 @@ class StreamAssembler: self.stream_handle.flush() self.written_bytes += len(_decrypted) log.debug("decrypted %s", blob.blob_hash[:8]) - self.wrote_bytes_event.set() await self.loop.run_in_executor(None, _decrypt_and_write) return True @@ -100,6 +99,9 @@ class StreamAssembler: if await self._decrypt_blob(blob, blob_info, self.descriptor.key): await self.blob_manager.blob_completed(blob) written_blobs = i + if not self.wrote_bytes_event.is_set(): + self.wrote_bytes_event.set() + log.debug("written %i/%i", written_blobs, len(self.descriptor.blobs) - 2) break except FileNotFoundError: log.debug("stream assembler stopped") @@ -114,7 +116,7 @@ class StreamAssembler: self.stream_finished_event.set() await self.after_finished() else: - log.debug("stream decryption and assembly did not finish (%i/%i blobs are done)", written_blobs, + log.debug("stream decryption and assembly did not finish (%i/%i blobs are done)", written_blobs or 0, len(self.descriptor.blobs) - 2) async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 1b77d572c..7edd3f8d1 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -279,7 +279,6 @@ class StreamManager: fee_amount: typing.Optional[float] = 0.0, fee_address: typing.Optional[str] = None, should_pay: typing.Optional[bool] = True) -> typing.Optional[ManagedStream]: - log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) claim = ClaimDict.load_dict(claim_info['value']) sd_hash = claim.source_hash.decode() if sd_hash in self.starting_streams: @@ -306,7 +305,6 @@ class StreamManager: finally: if sd_hash in self.starting_streams: del self.starting_streams[sd_hash] - log.info("returned from get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]: streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams)) diff --git a/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py b/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py index 0b6701edd..3eff3a17d 100644 --- a/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py +++ b/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py @@ -35,6 +35,14 @@ class DummyExchangeRateManager(exchange_rate_manager.ExchangeRateManager): feed.market, rates[feed.market]['spot'], rates[feed.market]['ts']) +def get_dummy_exchange_rate_manager(time): + rates = { + 'BTCLBC': {'spot': 3.0, 'ts': time.time() + 1}, + 'USDBTC': {'spot': 2.0, 'ts': time.time() + 2} + } + return DummyExchangeRateManager([BTCLBCFeed()], rates) + + class FeeFormatTest(unittest.TestCase): def test_fee_created_with_correct_inputs(self): fee_dict = { diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py new file mode 100644 index 000000000..6b567d70c --- /dev/null +++ b/tests/unit/stream/test_stream_manager.py @@ -0,0 +1,106 @@ +import os +import binascii +from unittest import mock +import asyncio +import time +from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase +from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager + +from lbrynet.extras.wallet.manager import LbryWalletManager +from lbrynet.stream.stream_manager import StreamManager +from lbrynet.stream.descriptor import StreamDescriptor +from lbrynet.dht.node import Node +from lbrynet.schema.claim import ClaimDict + + +def get_mock_node(peer): + def mock_accumulate_peers(q1: asyncio.Queue, q2: asyncio.Queue): + async def _task(): + pass + + q2.put_nowait([peer]) + return q2, asyncio.create_task(_task()) + + mock_node = mock.Mock(spec=Node) + mock_node.accumulate_peers = mock_accumulate_peers + return mock_node + + +def get_mock_wallet(sd_hash, storage): + claim = { + "address": "bYFeMtSL7ARuG1iMpjFyrnTe4oJHSAVNXF", + "amount": "0.1", + "claim_id": "c49566d631226492317d06ad7fdbe1ed32925124", + "claim_sequence": 1, + "decoded_claim": True, + "depth": 1057, + "effective_amount": "0.1", + "has_signature": False, + "height": 514081, + "hex": "", + "name": "33rpm", + "nout": 0, + "permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124", + "supports": [], + "txid": "81ac52662af926fdf639d56920069e0f63449d4cde074c61717cb99ddde40e3c", + "value": { + "claimType": "streamType", + "stream": { + "metadata": { + "author": "", + "description": "", + "language": "en", + "license": "None", + "licenseUrl": "", + "nsfw": False, + "preview": "", + "thumbnail": "", + "title": "33rpm", + "version": "_0_1_0" + }, + "source": { + "contentType": "image/png", + "source": sd_hash, + "sourceType": "lbry_sd_hash", + "version": "_0_0_1" + }, + "version": "_0_0_1" + }, + "version": "_0_0_1" + } + } + claim_dict = ClaimDict.load_dict(claim['value']) + claim['hex'] = binascii.hexlify(claim_dict.serialized).decode() + + async def mock_resolve(*args): + await storage.save_claims([claim]) + return { + claim['permanent_url']: claim + } + + mock_wallet = mock.Mock(spec=LbryWalletManager) + mock_wallet.resolve = mock_resolve + return mock_wallet, claim['permanent_url'] + + +class TestStreamManager(BlobExchangeTestBase): + async def asyncSetUp(self): + await super().asyncSetUp() + file_path = os.path.join(self.server_dir, "test_file") + with open(file_path, 'wb') as f: + f.write(os.urandom(20000000)) + descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path) + self.sd_hash = descriptor.calculate_sd_hash() + self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage) + self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, + self.client_storage, get_mock_node(self.server_from_client)) + self.exchange_rate_manager = get_dummy_exchange_rate_manager(time) + + async def test_download_from_uri(self): + self.assertSetEqual(self.stream_manager.streams, set()) + stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + self.assertTrue(stream.running) + self.assertFalse(stream.finished) + await stream.downloader.stream_finished_event.wait() + self.assertTrue(stream.finished) + self.assertFalse(stream.running)