diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 1f27a1812..8162322e1 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -381,7 +381,7 @@ class FileManagerComponent(Component): class BackgroundDownloader(Component): component_name = BACKGROUND_DOWNLOADER_COMPONENT - depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT] + depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) @@ -399,35 +399,11 @@ class BackgroundDownloader(Component): return self.status async def loop(self): - db: SQLiteStorage = self.component_manager.get_component(DATABASE_COMPONENT) while True: - for channel_id, download_latest, download_all in await db.get_subscriptions(): - amount = 1_000_000 if download_all else download_latest - if not amount: - continue - await self.ensure_download(channel_id, amount) self.finished_iteration.set() self.finished_iteration.clear() await asyncio.sleep(self.download_loop_delay_seconds) - async def ensure_download(self, channel_id, amount): - wallet = self.component_manager.get_component(WALLET_COMPONENT) - ledger = wallet.ledger - claims, _, _, _ = await ledger.claim_search( - ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height']) - offset = 0 - while claims and amount > 0: - for claim in claims: - offset += 1 - if not claim.script.source or claim.has_price: - continue - await self.download_blobs(claim.claim.stream.source.sd_hash) - amount -= 1 - if amount == 0: - break - claims, _, _, _ = await ledger.claim_search( - ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset) - async def download_blobs(self, sd_hash): blob_manager = self.component_manager.get_component(BLOB_COMPONENT) downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 6aebbeb85..3d0ad6166 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -574,7 +574,7 @@ class DiskSpaceManagement(CommandTestCase): class TestProactiveDownloaderComponent(CommandTestCase): - async def assertBlobs(self, *sd_hashes): + async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) for sd_hash in sd_hashes: @@ -586,39 +586,35 @@ class TestProactiveDownloaderComponent(CommandTestCase): self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) + if no_files: + self.assertEqual(0, len(await self.file_list())) - async def test_ensure_download(self): - unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01')) - channel_id = self.get_claim_id(await self.channel_create('@cool')) - content1 = await self.stream_create('content1', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23))) - content1 = content1['outputs'][0]['value']['source']['sd_hash'] - content2 = await self.stream_create('content2', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23))) - content2 = content2['outputs'][0]['value']['source']['sd_hash'] - await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD') - await self.stream_repost(unrelated_claim_id, 'repost') + async def clear(self): await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) + await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) + async def test_ensure_download(self): + content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 23))) + content1 = content1['outputs'][0]['value']['source']['sd_hash'] + content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23))) + content2 = content2['outputs'][0]['value']['source']['sd_hash'] + proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) - await proactive_downloader.ensure_download(channel_id, 1) + await self.clear() + await proactive_downloader.download_blobs(content1) await self.assertBlobs(content1) - await proactive_downloader.ensure_download(channel_id, 2) - await self.assertBlobs(content1, content2) - # ignores paid content - await proactive_downloader.ensure_download(channel_id, 3) - await self.assertBlobs(content1, content2) - # ignores reposts - await proactive_downloader.ensure_download(channel_id, 4) + await proactive_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) + await self.clear() + await proactive_downloader.download_blobs(content2) + await self.assertBlobs(content2) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.daemon.storage.get_blobs_for_stream( await self.daemon.storage.get_stream_hash_for_sd_hash(content1) ) - await self.daemon.jsonrpc_file_delete(delete_all=True) - self.assertEqual(0, len(await self.file_list())) - await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) - self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) + await self.clear() await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)