diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 1156b3e16..7c48e5e75 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -15,8 +15,12 @@ class DiskSpaceManager: self.task = None self.analytics = analytics + async def get_free_space_bytes(self, is_network_blob=False): + limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit + return max(0, limit_mb*1024*1024 - (await self.get_space_used_mb(is_network_blob))) + async def get_space_used_bytes(self, is_network_blob=False): - return await self.db.get_stored_blob_disk_usage(is_orphan_blob=is_network_blob) + return await self.db.get_stored_blob_disk_usage(is_network_blob=is_network_blob) async def get_space_used_mb(self, is_network_blob=False): return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0) @@ -25,15 +29,13 @@ class DiskSpaceManager: await self._clean(False) await self._clean(True) - async def _clean(self, from_network_storage=False): - space_used_bytes = await self.get_space_used_bytes(from_network_storage) - if from_network_storage: - storage_limit = self.config.network_storage_limit*1024*1024 if self.config.network_storage_limit else None - else: - storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None + async def _clean(self, is_network_blob=False): + space_used_bytes = await self.get_space_used_bytes(is_network_blob) + storage_limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit + storage_limit = storage_limit_mb*1024*1024 if storage_limit_mb else None if self.analytics: asyncio.create_task( - self.analytics.send_disk_space_used(space_used_bytes, storage_limit, from_network_storage) + self.analytics.send_disk_space_used(space_used_bytes, storage_limit, is_network_blob) ) if not storage_limit: return 0 @@ -41,7 +43,7 @@ class DiskSpaceManager: available = storage_limit - space_used_bytes if available > 0: return 0 - for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, orphans=from_network_storage): + for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, is_network_blob=is_network_blob): delete.append(blob_hash) available += file_size if available > 0: diff --git a/lbry/dht/node.py b/lbry/dht/node.py index de278a5f5..4bfb3d478 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -31,6 +31,11 @@ class Node: self._refresh_task: asyncio.Task = None self._storage = storage + @property + def last_requested_blob_hash(self): + if len(self.protocol.data_store.requested_blobs) > 0: + return self.protocol.data_store.requested_blobs[-1] + async def refresh_node(self, force_once=False): while True: # remove peers with expired blob announcements from the datastore diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index e919e8519..17ef80695 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -4,7 +4,6 @@ import asyncio import logging import binascii import typing -from collections import deque import base58 @@ -19,7 +18,7 @@ from lbry.dht.blob_announcer import BlobAnnouncer from lbry.blob.blob_manager import BlobManager from lbry.blob.disk_space_manager import DiskSpaceManager from lbry.blob_exchange.server import BlobServer -from lbry.stream.downloader import StreamDownloader +from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.stream_manager import StreamManager from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component @@ -381,7 +380,7 @@ class FileManagerComponent(Component): self.file_manager.stop() -class BackgroundDownloader(Component): +class BackgroundDownloaderComponent(Component): component_name = BACKGROUND_DOWNLOADER_COMPONENT depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT] @@ -389,57 +388,46 @@ class BackgroundDownloader(Component): super().__init__(component_manager) self.task: typing.Optional[asyncio.Task] = None self.download_loop_delay_seconds = 60 - self.finished_iteration = asyncio.Event() - self.requested_blobs = deque(maxlen=10) self.ongoing_download: typing.Optional[asyncio.Task] = None + self.space_manager: typing.Optional[DiskSpaceManager] = None + self.background_downloader: typing.Optional[BackgroundDownloader] = None + self.dht_node: typing.Optional[Node] = None @property - def component(self) -> 'BackgroundDownloader': + def is_busy(self): + return bool(self.ongoing_download and not self.ongoing_download.done()) + + @property + def component(self) -> 'BackgroundDownloaderComponent': return self async def get_status(self): - return {'running': self.task is not None and not self.task.done(), 'enqueued': len(self.requested_blobs)} + return {'running': self.task is not None and not self.task.done(), + 'ongoing_download': self.is_busy} async def loop(self): while True: - if self.component_manager.has_component(DHT_COMPONENT): - node = self.component_manager.get_component(DHT_COMPONENT) - self.requested_blobs = node.protocol.data_store.requested_blobs - if self.requested_blobs and (not self.ongoing_download or self.ongoing_download.done()): - blob_hash = self.requested_blobs.pop() - self.ongoing_download = asyncio.create_task(self.download_blobs(blob_hash)) - self.ongoing_download.add_done_callback(lambda _: self.finished_iteration.set()) - self.finished_iteration.clear() + if not self.is_busy and await self.space_manager.get_free_space_bytes(True) > 0: + blob_hash = self.dht_node.last_requested_blob_hash + if blob_hash: + self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash)) await asyncio.sleep(self.download_loop_delay_seconds) - async def download_blobs(self, sd_hash): - if self.conf.network_storage_limit <= 0: - return - space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT) - if (await space_manager.get_space_used_mb(True)) >= self.conf.network_storage_limit: - log.info("Allocated space for proactive downloader is full. Background download aborted.") - return - blob_manager = self.component_manager.get_component(BLOB_COMPONENT) - downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) - storage = blob_manager.storage - node = None - if self.component_manager.has_component(DHT_COMPONENT): - node = self.component_manager.get_component(DHT_COMPONENT) - try: - await downloader.start(node, save_stream=False) - except ValueError: - return - for blob_info in downloader.descriptor.blobs[:-1]: - await downloader.download_stream_blob(blob_info) - await storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) - async def start(self): + self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT) + if not self.component_manager.has_component(DHT_COMPONENT): + return + self.dht_node = self.component_manager.get_component(DHT_COMPONENT) + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + storage = self.component_manager.get_component(DATABASE_COMPONENT) + self.background_downloader = BackgroundDownloader(self.conf, storage, blob_manager, self.dht_node) self.task = asyncio.create_task(self.loop()) async def stop(self): if self.ongoing_download and not self.ongoing_download.done(): self.ongoing_download.cancel() - self.task.cancel() + if self.task: + self.task.cancel() class DiskSpaceComponent(Component): diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index a0aa29adc..054bb109d 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -442,9 +442,9 @@ class SQLiteStorage(SQLiteMixin): def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") - async def get_stored_blobs(self, is_mine: bool, orphans=False): + async def get_stored_blobs(self, is_mine: bool, is_network_blob=False): is_mine = 1 if is_mine else 0 - if orphans: + if is_network_blob: return await self.db.execute_fetchall( "select blob.blob_hash, blob.blob_length, blob.added_on " "from blob left join stream_blob using (blob_hash) " @@ -466,14 +466,14 @@ class SQLiteStorage(SQLiteMixin): ) return normal_blobs + sd_blobs - async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_orphan_blob: bool = False): + async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_network_blob: bool = False): sql = "select coalesce(sum(blob_length), 0) " - if is_orphan_blob: + if is_network_blob: sql += "from blob left join stream_blob using (blob_hash) where stream_blob.stream_hash is null" else: sql += "from blob join stream_blob using (blob_hash)" if is_mine is not None: - sql += f'{(" and " if is_orphan_blob else " where ")} is_mine=?' + sql += f'{(" and " if is_network_blob else " where ")} is_mine=?' args = (1 if is_mine else 0,) if is_mine is not None else () return (await self.db.execute_fetchone(sql, args))[0] diff --git a/lbry/stream/background_downloader.py b/lbry/stream/background_downloader.py index d2182831f..d02a7c9c0 100644 --- a/lbry/stream/background_downloader.py +++ b/lbry/stream/background_downloader.py @@ -4,7 +4,7 @@ from lbry.stream.downloader import StreamDownloader class BackgroundDownloader: - def __init__(self, conf, storage, blob_manager, dht_node): + def __init__(self, conf, storage, blob_manager, dht_node=None): self.storage = storage self.blob_manager = blob_manager self.node = dht_node @@ -18,4 +18,4 @@ class BackgroundDownloader: return for blob_info in downloader.descriptor.blobs[:-1]: await downloader.download_stream_blob(blob_info) - await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) \ No newline at end of file + await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index b7d4a05ad..03572dcec 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -4,6 +4,7 @@ import os from binascii import hexlify from lbry.schema import Claim +from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT @@ -611,8 +612,7 @@ class TestProactiveDownloaderComponent(CommandTestCase): content2 = content2['outputs'][0]['value']['source']['sd_hash'] self.assertEqual('48', (await self.status())['disk_space']['space_used']) - proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) - self.daemon.conf.network_storage_limit = 100 + proactive_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager) await self.clear() self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used']) @@ -636,34 +636,14 @@ class TestProactiveDownloaderComponent(CommandTestCase): await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) - # trigger from requested blobs - await self.clear() - await proactive_downloader.stop() - proactive_downloader.requested_blobs.append(content1) - finished = proactive_downloader.finished_iteration.wait() - await proactive_downloader.start() - await finished - await self.assertBlobs(content1) - await self.clear() # test that disk space manager doesn't delete orphan network blobs await proactive_downloader.download_blobs(content1) await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning await self.daemon.jsonrpc_get("content2", save_file=False) - while (await self.file_list())[0]['status'] == 'running': + while (await self.file_list())[0]['status'] != 'stopped': await asyncio.sleep(0.5) await self.assertBlobs(content1, no_files=False) self.daemon.conf.blob_storage_limit = 1 await self.blob_clean() await self.assertBlobs(content1, no_files=False) - - # downloading above limit triggers cleanup - self.daemon.conf.network_storage_limit = 6 - with self.assertLogs() as log: - await proactive_downloader.download_blobs(content2) - self.assertIn('Allocated space for proactive downloader is full.', log.output[0]) - await self.assertBlobs(content1, no_files=False) - self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used']) - await self.blob_clean() - self.assertLessEqual(int((await self.status())['disk_space']['network_seeding_space_used']), - self.daemon.conf.network_storage_limit) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index 7e03f5908..a237c1ac8 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -33,7 +33,7 @@ class TestComponentManager(AsyncioTestCase): components.WalletServerPaymentsComponent ], [ - components.BackgroundDownloader, + components.BackgroundDownloaderComponent, ] ] self.component_manager = ComponentManager(Config())