diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index e9fdb0573..21b6fcd4b 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -17,7 +17,7 @@ from lbry.dht.blob_announcer import BlobAnnouncer from lbry.blob.blob_manager import BlobManager from lbry.blob.disk_space_manager import DiskSpaceManager from lbry.blob_exchange.server import BlobServer -from lbry.stream.managed_stream import ManagedStream +from lbry.stream.downloader import StreamDownloader from lbry.stream.stream_manager import StreamManager from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component @@ -381,7 +381,7 @@ class FileManagerComponent(Component): class BackgroundDownloader(Component): component_name = BACKGROUND_DOWNLOADER_COMPONENT - depends_on = [FILE_MANAGER_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] + depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) @@ -411,7 +411,6 @@ class BackgroundDownloader(Component): await asyncio.sleep(self.download_loop_delay_seconds) async def ensure_download(self, channel_id, amount): - file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) wallet = self.component_manager.get_component(WALLET_COMPONENT) ledger = wallet.ledger claims, _, _, _ = await ledger.claim_search( @@ -422,17 +421,24 @@ class BackgroundDownloader(Component): offset += 1 if not claim.script.source or claim.has_price: continue - stream = await file_manager.download_from_uri( - claim.permanent_url, None, 60.0, save_file=False, wallet=wallet - ) - if isinstance(stream, ManagedStream): - await stream.save_blobs() + await self.download_blobs(claim.claim.stream.source.sd_hash) amount -= 1 if amount == 0: break claims, _, _, _ = await ledger.claim_search( ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset) + async def download_blobs(self, sd_hash): + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) + node = None + if self.component_manager.has_component(DHT_COMPONENT): + node = self.component_manager.get_component(DHT_COMPONENT) + await downloader.start(node) + await downloader.load_descriptor() + for blob_info in downloader.descriptor.blobs[:-1]: + await downloader.download_stream_blob(blob_info) + async def start(self): self.task = asyncio.create_task(self.loop()) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 51520b300..54b917f89 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -574,50 +574,63 @@ class DiskSpaceManagement(CommandTestCase): class TestProactiveDownloaderComponent(CommandTestCase): - async def assertFileList(self, *txos): - txos_names = {txo['outputs'][0]['name'] for txo in txos} - files = await self.file_list(blobs_remaining=0) - self.assertEqual(len(txos), len(files)) - file_claim_names = {file['claim_name'] for file in files} - self.assertSetEqual(txos_names, file_claim_names) + async def assertBlobs(self, *sd_hashes): + # checks that we have ony the finished blobs needed for the the referenced streams + seen = set(sd_hashes) + for sd_hash in sd_hashes: + self.assertTrue(self.daemon.blob_manager.get_blob(sd_hash).get_is_verified()) + blobs = await self.daemon.storage.get_blobs_for_stream( + await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) + ) + for blob in blobs[:-1]: + self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) + seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) + self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) async def test_ensure_download(self): unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01')) channel_id = self.get_claim_id(await self.channel_create('@cool')) content1 = await self.stream_create('content1', '0.01', channel_id=channel_id) + content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', channel_id=channel_id) + content2 = content2['outputs'][0]['value']['source']['sd_hash'] await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD') await self.stream_repost(unrelated_claim_id, 'repost') await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) + self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) - await self.assertFileList() await proactive_downloader.ensure_download(channel_id, 1) - await self.assertFileList(content1) + await self.assertBlobs(content1) await proactive_downloader.ensure_download(channel_id, 2) - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) # ignores paid content await proactive_downloader.ensure_download(channel_id, 3) - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) # ignores reposts await proactive_downloader.ensure_download(channel_id, 4) - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) + await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) + self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) + await proactive_downloader.stop() await self.daemon.jsonrpc_channel_subscribe(channel_id, 1) await proactive_downloader.start() await proactive_downloader.finished_iteration.wait() - await self.assertFileList(content1) + await self.assertBlobs(content1) await self.daemon.jsonrpc_file_delete(delete_all=True) await self.daemon.jsonrpc_channel_subscribe(channel_id, download_all=True) await proactive_downloader.stop() await proactive_downloader.start() await proactive_downloader.finished_iteration.wait() - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) + + self.assertEqual(0, len(await self.file_list())) self.assertEqual([(channel_id, 0, 1)], await self.daemon.jsonrpc_channel_subscription_list()) await self.daemon.jsonrpc_channel_unsubscribe(channel_id)