working disk cleanup
This commit is contained in:
parent
b4c3307cdf
commit
51d21d8c86
6 changed files with 97 additions and 21 deletions
|
@ -1,21 +1,61 @@
|
||||||
import os
|
import os
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class DiskSpaceManager:
|
class DiskSpaceManager:
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config, cleaning_interval=30 * 60):
|
||||||
self.config = config
|
self.config = config
|
||||||
|
self.cleaning_interval = cleaning_interval
|
||||||
|
self.running = False
|
||||||
|
self.task = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def space_used_bytes(self):
|
def space_used_bytes(self):
|
||||||
used = 0
|
used = 0
|
||||||
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
|
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
|
||||||
for item in os.listdir(data_dir):
|
for item in os.scandir(data_dir):
|
||||||
blob_path = os.path.join(data_dir, item)
|
if item.is_file:
|
||||||
if os.path.isfile(blob_path):
|
used += item.stat().st_size
|
||||||
used += os.path.getsize(blob_path)
|
|
||||||
return used
|
return used
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def space_used_mb(self):
|
def space_used_mb(self):
|
||||||
return int(self.space_used_bytes/1024.0/1024.0)
|
return int(self.space_used_bytes/1024.0/1024.0)
|
||||||
|
|
||||||
|
def clean(self):
|
||||||
|
if not self.config.blob_storage_limit:
|
||||||
|
return
|
||||||
|
used = 0
|
||||||
|
files = []
|
||||||
|
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
|
||||||
|
for file in os.scandir(data_dir):
|
||||||
|
if file.is_file:
|
||||||
|
file_stats = file.stat()
|
||||||
|
used += file_stats.st_size
|
||||||
|
files.append((file_stats.st_mtime, file_stats.st_size, file))
|
||||||
|
files.sort()
|
||||||
|
available = (self.config.blob_storage_limit*1024*1024) - used
|
||||||
|
for _, file_size, file in files:
|
||||||
|
available += file_size
|
||||||
|
if available > 0:
|
||||||
|
break
|
||||||
|
os.remove(file)
|
||||||
|
|
||||||
|
async def cleaning_loop(self):
|
||||||
|
while self.running:
|
||||||
|
await asyncio.get_event_loop().run_in_executor(None, self.clean)
|
||||||
|
await asyncio.sleep(self.cleaning_interval)
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
self.running = True
|
||||||
|
self.task = asyncio.create_task(self.cleaning_loop())
|
||||||
|
self.task.add_done_callback(lambda _: log.info("Stopping blob cleanup service."))
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
if self.running:
|
||||||
|
self.running = False
|
||||||
|
self.task.cancel()
|
||||||
|
|
|
@ -390,14 +390,15 @@ class DiskSpaceComponent(Component):
|
||||||
|
|
||||||
async def get_status(self):
|
async def get_status(self):
|
||||||
return {
|
return {
|
||||||
'used': str(self.disk_space_manager.space_used_mb),
|
'space_used': str(self.disk_space_manager.space_used_mb),
|
||||||
|
'running': self.disk_space_manager.running,
|
||||||
}
|
}
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
pass
|
await self.disk_space_manager.start()
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
pass
|
await self.disk_space_manager.stop()
|
||||||
|
|
||||||
|
|
||||||
class TorrentComponent(Component):
|
class TorrentComponent(Component):
|
||||||
|
|
|
@ -56,7 +56,7 @@ from lbry.schema.url import URL
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.blob.blob_manager import BlobManager
|
from lbry.blob.blob_manager import BlobManager
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
from lbry.extras.daemon.components import UPnPComponent
|
from lbry.extras.daemon.components import UPnPComponent, DiskSpaceManager
|
||||||
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
from lbry.wallet import WalletManager, Ledger
|
from lbry.wallet import WalletManager, Ledger
|
||||||
|
@ -375,6 +375,10 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
def blob_manager(self) -> typing.Optional['BlobManager']:
|
def blob_manager(self) -> typing.Optional['BlobManager']:
|
||||||
return self.component_manager.get_component(BLOB_COMPONENT)
|
return self.component_manager.get_component(BLOB_COMPONENT)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def disk_space_manager(self) -> typing.Optional['DiskSpaceManager']:
|
||||||
|
return self.component_manager.get_component(DISK_SPACE_COMPONENT)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def upnp(self) -> typing.Optional['UPnPComponent']:
|
def upnp(self) -> typing.Optional['UPnPComponent']:
|
||||||
return self.component_manager.get_component(UPNP_COMPONENT)
|
return self.component_manager.get_component(UPNP_COMPONENT)
|
||||||
|
@ -4916,6 +4920,22 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@requires(DISK_SPACE_COMPONENT)
|
||||||
|
async def jsonrpc_blob_clean(self):
|
||||||
|
"""
|
||||||
|
Deletes blobs to cleanup disk space
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
blob_clean
|
||||||
|
|
||||||
|
Options:
|
||||||
|
None
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(bool) true if successful
|
||||||
|
"""
|
||||||
|
return self.disk_space_manager.clean()
|
||||||
|
|
||||||
@requires(FILE_MANAGER_COMPONENT)
|
@requires(FILE_MANAGER_COMPONENT)
|
||||||
async def jsonrpc_file_reflect(self, **kwargs):
|
async def jsonrpc_file_reflect(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -607,6 +607,12 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs])
|
await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs])
|
||||||
return self.sout(txs)
|
return self.sout(txs)
|
||||||
|
|
||||||
|
async def blob_clean(self):
|
||||||
|
return await self.out(self.daemon.jsonrpc_blob_clean())
|
||||||
|
|
||||||
|
async def status(self):
|
||||||
|
return await self.out(self.daemon.jsonrpc_status())
|
||||||
|
|
||||||
async def resolve(self, uri, **kwargs):
|
async def resolve(self, uri, **kwargs):
|
||||||
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]
|
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]
|
||||||
|
|
||||||
|
|
|
@ -516,11 +516,15 @@ class FileCommands(CommandTestCase):
|
||||||
class DiskSpaceManagement(CommandTestCase):
|
class DiskSpaceManagement(CommandTestCase):
|
||||||
|
|
||||||
async def test_file_management(self):
|
async def test_file_management(self):
|
||||||
status = await self.daemon.jsonrpc_status()
|
status = await self.status()
|
||||||
self.assertIn('disk_space', status)
|
self.assertIn('disk_space', status)
|
||||||
self.assertEqual(status['disk_space']['used'], '0')
|
self.assertEqual('0', status['disk_space']['space_used'])
|
||||||
await self.stream_create('foo', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
self.assertEqual(True, status['disk_space']['running'])
|
||||||
await self.stream_create('foo', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
||||||
status = await self.daemon.jsonrpc_status()
|
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||||
self.assertIn('disk_space', status)
|
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
|
||||||
self.assertEqual(status['disk_space']['used'], '5')
|
await self.blob_clean()
|
||||||
|
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
|
||||||
|
self.daemon.conf.blob_storage_limit = 3
|
||||||
|
await self.blob_clean()
|
||||||
|
self.assertEqual('3', (await self.status())['disk_space']['space_used'])
|
||||||
|
|
|
@ -9,16 +9,21 @@ from lbry.blob.disk_space_manager import DiskSpaceManager
|
||||||
|
|
||||||
class ConfigurationTests(unittest.TestCase):
|
class ConfigurationTests(unittest.TestCase):
|
||||||
|
|
||||||
def test_space_calculation(self):
|
def test_space_management(self):
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
os.mkdir(os.path.join(temp_dir, 'blobfiles'))
|
os.mkdir(os.path.join(temp_dir, 'blobfiles'))
|
||||||
config = Config(
|
config = Config(
|
||||||
|
blob_storage_limit=5,
|
||||||
data_dir=temp_dir,
|
data_dir=temp_dir,
|
||||||
wallet_dir=temp_dir,
|
wallet_dir=temp_dir,
|
||||||
config=os.path.join(temp_dir, 'settings.yml')
|
config=os.path.join(temp_dir, 'settings.yml'),
|
||||||
)
|
)
|
||||||
dsm = DiskSpaceManager(config)
|
dsm = DiskSpaceManager(config)
|
||||||
self.assertEqual(0, dsm.space_used_mb)
|
self.assertEqual(0, dsm.space_used_mb)
|
||||||
with open(os.path.join(config.data_dir, 'blobfiles', '3mb-file'), 'w') as blob:
|
for file_no in range(10):
|
||||||
blob.write('0' * 3 * 1024 * 1024)
|
with open(os.path.join(config.data_dir, 'blobfiles', f'3mb-{file_no}'), 'w') as blob:
|
||||||
self.assertEqual(3, dsm.space_used_mb)
|
blob.write('0' * 1 * 1024 * 1024)
|
||||||
|
self.assertEqual(10, dsm.space_used_mb)
|
||||||
|
dsm.clean()
|
||||||
|
self.assertEqual(4, dsm.space_used_mb)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue