diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 74463e4aa..e95a78cea 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -1,21 +1,61 @@ import os +import asyncio +import logging + +log = logging.getLogger(__name__) class DiskSpaceManager: - def __init__(self, config): + def __init__(self, config, cleaning_interval=30 * 60): self.config = config + self.cleaning_interval = cleaning_interval + self.running = False + self.task = None @property def space_used_bytes(self): used = 0 data_dir = os.path.join(self.config.data_dir, 'blobfiles') - for item in os.listdir(data_dir): - blob_path = os.path.join(data_dir, item) - if os.path.isfile(blob_path): - used += os.path.getsize(blob_path) + for item in os.scandir(data_dir): + if item.is_file: + used += item.stat().st_size return used @property def space_used_mb(self): return int(self.space_used_bytes/1024.0/1024.0) + + def clean(self): + if not self.config.blob_storage_limit: + return + used = 0 + files = [] + data_dir = os.path.join(self.config.data_dir, 'blobfiles') + for file in os.scandir(data_dir): + if file.is_file: + file_stats = file.stat() + used += file_stats.st_size + files.append((file_stats.st_mtime, file_stats.st_size, file)) + files.sort() + available = (self.config.blob_storage_limit*1024*1024) - used + for _, file_size, file in files: + available += file_size + if available > 0: + break + os.remove(file) + + async def cleaning_loop(self): + while self.running: + await asyncio.get_event_loop().run_in_executor(None, self.clean) + await asyncio.sleep(self.cleaning_interval) + + async def start(self): + self.running = True + self.task = asyncio.create_task(self.cleaning_loop()) + self.task.add_done_callback(lambda _: log.info("Stopping blob cleanup service.")) + + async def stop(self): + if self.running: + self.running = False + self.task.cancel() diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index fbaa598cb..a383b9ea5 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -390,14 +390,15 @@ class DiskSpaceComponent(Component): async def get_status(self): return { - 'used': str(self.disk_space_manager.space_used_mb), + 'space_used': str(self.disk_space_manager.space_used_mb), + 'running': self.disk_space_manager.running, } async def start(self): - pass + await self.disk_space_manager.start() async def stop(self): - pass + await self.disk_space_manager.stop() class TorrentComponent(Component): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index bf33c0541..814f4e50e 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -56,7 +56,7 @@ from lbry.schema.url import URL if typing.TYPE_CHECKING: from lbry.blob.blob_manager import BlobManager from lbry.dht.node import Node - from lbry.extras.daemon.components import UPnPComponent + from lbry.extras.daemon.components import UPnPComponent, DiskSpaceManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage from lbry.wallet import WalletManager, Ledger @@ -375,6 +375,10 @@ class Daemon(metaclass=JSONRPCServerType): def blob_manager(self) -> typing.Optional['BlobManager']: return self.component_manager.get_component(BLOB_COMPONENT) + @property + def disk_space_manager(self) -> typing.Optional['DiskSpaceManager']: + return self.component_manager.get_component(DISK_SPACE_COMPONENT) + @property def upnp(self) -> typing.Optional['UPnPComponent']: return self.component_manager.get_component(UPNP_COMPONENT) @@ -4916,6 +4920,22 @@ class Daemon(metaclass=JSONRPCServerType): raise NotImplementedError() + @requires(DISK_SPACE_COMPONENT) + async def jsonrpc_blob_clean(self): + """ + Deletes blobs to cleanup disk space + + Usage: + blob_clean + + Options: + None + + Returns: + (bool) true if successful + """ + return self.disk_space_manager.clean() + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_reflect(self, **kwargs): """ diff --git a/lbry/testcase.py b/lbry/testcase.py index 9b65e2fe8..210b99b3c 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -607,6 +607,12 @@ class CommandTestCase(IntegrationTestCase): await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs]) return self.sout(txs) + async def blob_clean(self): + return await self.out(self.daemon.jsonrpc_blob_clean()) + + async def status(self): + return await self.out(self.daemon.jsonrpc_status()) + async def resolve(self, uri, **kwargs): return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri] diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 39e9b315d..dccaca324 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -516,11 +516,15 @@ class FileCommands(CommandTestCase): class DiskSpaceManagement(CommandTestCase): async def test_file_management(self): - status = await self.daemon.jsonrpc_status() + status = await self.status() self.assertIn('disk_space', status) - self.assertEqual(status['disk_space']['used'], '0') - await self.stream_create('foo', '0.01', data=('0' * 3 * 1024 * 1024).encode()) - await self.stream_create('foo', '0.01', data=('0' * 2 * 1024 * 1024).encode()) - status = await self.daemon.jsonrpc_status() - self.assertIn('disk_space', status) - self.assertEqual(status['disk_space']['used'], '5') + self.assertEqual('0', status['disk_space']['space_used']) + self.assertEqual(True, status['disk_space']['running']) + await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode()) + await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode()) + self.assertEqual('5', (await self.status())['disk_space']['space_used']) + await self.blob_clean() + self.assertEqual('5', (await self.status())['disk_space']['space_used']) + self.daemon.conf.blob_storage_limit = 3 + await self.blob_clean() + self.assertEqual('3', (await self.status())['disk_space']['space_used']) diff --git a/tests/unit/blob/test_disk_space_manager.py b/tests/unit/blob/test_disk_space_manager.py index 95bd1960f..35f11212c 100644 --- a/tests/unit/blob/test_disk_space_manager.py +++ b/tests/unit/blob/test_disk_space_manager.py @@ -9,16 +9,21 @@ from lbry.blob.disk_space_manager import DiskSpaceManager class ConfigurationTests(unittest.TestCase): - def test_space_calculation(self): + def test_space_management(self): with tempfile.TemporaryDirectory() as temp_dir: os.mkdir(os.path.join(temp_dir, 'blobfiles')) config = Config( + blob_storage_limit=5, data_dir=temp_dir, wallet_dir=temp_dir, - config=os.path.join(temp_dir, 'settings.yml') + config=os.path.join(temp_dir, 'settings.yml'), ) dsm = DiskSpaceManager(config) self.assertEqual(0, dsm.space_used_mb) - with open(os.path.join(config.data_dir, 'blobfiles', '3mb-file'), 'w') as blob: - blob.write('0' * 3 * 1024 * 1024) - self.assertEqual(3, dsm.space_used_mb) + for file_no in range(10): + with open(os.path.join(config.data_dir, 'blobfiles', f'3mb-{file_no}'), 'w') as blob: + blob.write('0' * 1 * 1024 * 1024) + self.assertEqual(10, dsm.space_used_mb) + dsm.clean() + self.assertEqual(4, dsm.space_used_mb) +