Merge pull request #3378 from lbryio/disk_management

ability to limit disk spaced used for blobs via `blob_storage_limit` setting (oldest blobs are deleted when disk space limit is reached)
This commit is contained in:
Lex Berezhny 2021-08-16 17:41:05 -04:00 committed by GitHub
commit bbf1ef0dc3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 200 additions and 6 deletions

View file

@ -31,8 +31,8 @@ jobs:
matrix: matrix:
os: os:
- ubuntu-latest - ubuntu-latest
#- macos-latest - macos-latest
#- windows-latest - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
@ -49,10 +49,18 @@ jobs:
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- run: pip install --user --upgrade pip wheel - run: pip install --user --upgrade pip wheel
- run: pip install -e .[torrent,test] - if: startsWith(runner.os, 'linux')
- env: run: pip install -e .[torrent,test]
- if: startsWith(runner.os, 'linux')
env:
HOME: /tmp HOME: /tmp
run: make test-unit-coverage run: make test-unit-coverage
- if: startsWith(runner.os, 'linux') != true
run: pip install -e .[test]
- if: startsWith(runner.os, 'linux') != true
env:
HOME: /tmp
run: python -m unittest tests/unit/test_conf.py
tests-integration: tests-integration:
name: "tests / integration" name: "tests / integration"

View file

@ -0,0 +1,64 @@
import os
import asyncio
import logging
log = logging.getLogger(__name__)
class DiskSpaceManager:
def __init__(self, config, cleaning_interval=30 * 60):
self.config = config
self.cleaning_interval = cleaning_interval
self.running = False
self.task = None
@property
def space_used_bytes(self):
used = 0
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
for item in os.scandir(data_dir):
if item.is_file:
used += item.stat().st_size
return used
@property
def space_used_mb(self):
return int(self.space_used_bytes/1024.0/1024.0)
def clean(self):
if not self.config.blob_storage_limit:
return 0
used = 0
files = []
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
for file in os.scandir(data_dir):
if file.is_file:
file_stats = file.stat()
used += file_stats.st_size
files.append((file_stats.st_mtime, file_stats.st_size, file.path))
files.sort()
available = (self.config.blob_storage_limit*1024*1024) - used
cleaned = 0
for _, file_size, file in files:
available += file_size
if available > 0:
break
os.remove(file)
cleaned += 1
return cleaned
async def cleaning_loop(self):
while self.running:
await asyncio.get_event_loop().run_in_executor(None, self.clean)
await asyncio.sleep(self.cleaning_interval)
async def start(self):
self.running = True
self.task = asyncio.create_task(self.cleaning_loop())
self.task.add_done_callback(lambda _: log.info("Stopping blob cleanup service."))
async def stop(self):
if self.running:
self.running = False
self.task.cancel()

View file

@ -634,6 +634,7 @@ class Config(CLIConfig):
# blob announcement and download # blob announcement and download
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True) save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
blob_storage_limit = Integer("Disk space in MB to be allocated for blob storage. 0 = no limit", 0)
blob_lru_cache_size = Integer( blob_lru_cache_size = Integer(
"LRU cache size for decrypted downloaded blobs used to minimize re-downloading the same blobs when " "LRU cache size for decrypted downloaded blobs used to minimize re-downloading the same blobs when "
"replying to a range request. Set to 0 to disable.", 32 "replying to a range request. Set to 0 to disable.", 32

View file

@ -15,6 +15,7 @@ from lbry.dht.node import Node
from lbry.dht.peer import is_valid_public_ipv4 from lbry.dht.peer import is_valid_public_ipv4
from lbry.dht.blob_announcer import BlobAnnouncer from lbry.dht.blob_announcer import BlobAnnouncer
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.blob.disk_space_manager import DiskSpaceManager
from lbry.blob_exchange.server import BlobServer from lbry.blob_exchange.server import BlobServer
from lbry.stream.stream_manager import StreamManager from lbry.stream.stream_manager import StreamManager
from lbry.file.file_manager import FileManager from lbry.file.file_manager import FileManager
@ -40,6 +41,7 @@ WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments"
DHT_COMPONENT = "dht" DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer" HASH_ANNOUNCER_COMPONENT = "hash_announcer"
FILE_MANAGER_COMPONENT = "file_manager" FILE_MANAGER_COMPONENT = "file_manager"
DISK_SPACE_COMPONENT = "disk_space"
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp" UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
@ -375,6 +377,30 @@ class FileManagerComponent(Component):
self.file_manager.stop() self.file_manager.stop()
class DiskSpaceComponent(Component):
component_name = DISK_SPACE_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
self.disk_space_manager = DiskSpaceManager(self.conf)
@property
def component(self) -> typing.Optional[DiskSpaceManager]:
return self.disk_space_manager
async def get_status(self):
return {
'space_used': str(self.disk_space_manager.space_used_mb),
'running': self.disk_space_manager.running,
}
async def start(self):
await self.disk_space_manager.start()
async def stop(self):
await self.disk_space_manager.stop()
class TorrentComponent(Component): class TorrentComponent(Component):
component_name = LIBTORRENT_COMPONENT component_name = LIBTORRENT_COMPONENT

View file

@ -41,7 +41,7 @@ from lbry.error import (
from lbry.extras import system_info from lbry.extras import system_info
from lbry.extras.daemon import analytics from lbry.extras.daemon import analytics
from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT, DISK_SPACE_COMPONENT
from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT
from lbry.extras.daemon.componentmanager import RequiredCondition from lbry.extras.daemon.componentmanager import RequiredCondition
from lbry.extras.daemon.componentmanager import ComponentManager from lbry.extras.daemon.componentmanager import ComponentManager
@ -56,7 +56,7 @@ from lbry.schema.url import URL
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.extras.daemon.components import UPnPComponent from lbry.extras.daemon.components import UPnPComponent, DiskSpaceManager
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import WalletManager, Ledger from lbry.wallet import WalletManager, Ledger
@ -375,6 +375,10 @@ class Daemon(metaclass=JSONRPCServerType):
def blob_manager(self) -> typing.Optional['BlobManager']: def blob_manager(self) -> typing.Optional['BlobManager']:
return self.component_manager.get_component(BLOB_COMPONENT) return self.component_manager.get_component(BLOB_COMPONENT)
@property
def disk_space_manager(self) -> typing.Optional['DiskSpaceManager']:
return self.component_manager.get_component(DISK_SPACE_COMPONENT)
@property @property
def upnp(self) -> typing.Optional['UPnPComponent']: def upnp(self) -> typing.Optional['UPnPComponent']:
return self.component_manager.get_component(UPNP_COMPONENT) return self.component_manager.get_component(UPNP_COMPONENT)
@ -4916,6 +4920,22 @@ class Daemon(metaclass=JSONRPCServerType):
raise NotImplementedError() raise NotImplementedError()
@requires(DISK_SPACE_COMPONENT)
async def jsonrpc_blob_clean(self):
"""
Deletes blobs to cleanup disk space
Usage:
blob_clean
Options:
None
Returns:
(bool) true if successful
"""
return self.disk_space_manager.clean()
@requires(FILE_MANAGER_COMPONENT) @requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_reflect(self, **kwargs): async def jsonrpc_file_reflect(self, **kwargs):
""" """

View file

@ -607,6 +607,12 @@ class CommandTestCase(IntegrationTestCase):
await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs]) await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs])
return self.sout(txs) return self.sout(txs)
async def blob_clean(self):
return await self.out(self.daemon.jsonrpc_blob_clean())
async def status(self):
return await self.out(self.daemon.jsonrpc_status())
async def resolve(self, uri, **kwargs): async def resolve(self, uri, **kwargs):
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri] return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]

View file

@ -511,3 +511,20 @@ class FileCommands(CommandTestCase):
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast(tx) await self.broadcast(tx)
await self.confirm_tx(tx.id) await self.confirm_tx(tx.id)
class DiskSpaceManagement(CommandTestCase):
async def test_file_management(self):
status = await self.status()
self.assertIn('disk_space', status)
self.assertEqual('0', status['disk_space']['space_used'])
self.assertEqual(True, status['disk_space']['running'])
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
await self.blob_clean()
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
self.daemon.conf.blob_storage_limit = 3
await self.blob_clean()
self.assertEqual('3', (await self.status())['disk_space']['space_used'])

View file

@ -0,0 +1,30 @@
import os
import unittest
import tempfile
import lbry.wallet
from lbry.conf import Config
from lbry.blob.disk_space_manager import DiskSpaceManager
class ConfigurationTests(unittest.TestCase):
def test_space_management(self):
with tempfile.TemporaryDirectory() as temp_dir:
os.mkdir(os.path.join(temp_dir, 'blobfiles'))
config = Config(
blob_storage_limit=5,
data_dir=temp_dir,
wallet_dir=temp_dir,
config=os.path.join(temp_dir, 'settings.yml'),
)
dsm = DiskSpaceManager(config)
self.assertEqual(0, dsm.space_used_mb)
for file_no in range(10):
with open(os.path.join(config.data_dir, 'blobfiles', f'3mb-{file_no}'), 'w') as blob:
blob.write('0' * 1 * 1024 * 1024)
self.assertEqual(10, dsm.space_used_mb)
self.assertTrue(dsm.clean())
self.assertEqual(5, dsm.space_used_mb)
self.assertFalse(dsm.clean())

View file

@ -15,6 +15,7 @@ class TestComponentManager(AsyncioTestCase):
self.default_components_sort = [ self.default_components_sort = [
[ [
components.DatabaseComponent, components.DatabaseComponent,
components.DiskSpaceComponent,
components.ExchangeRateManagerComponent, components.ExchangeRateManagerComponent,
components.TorrentComponent, components.TorrentComponent,
components.UPnPComponent components.UPnPComponent

View file

@ -21,6 +21,27 @@ class TestConfig(BaseConfig):
class ConfigurationTests(unittest.TestCase): class ConfigurationTests(unittest.TestCase):
@unittest.skipIf('darwin' not in sys.platform, 'skipping mac only test')
def test_mac_defaults(self):
c = Config()
self.assertEqual(c.data_dir, os.path.expanduser("~/Library/Application Support/LBRY"))
self.assertEqual(c.wallet_dir, os.path.expanduser('~/.lbryum'))
self.assertEqual(c.download_dir, os.path.expanduser('~/Downloads'))
self.assertEqual(c.config, os.path.join(c.data_dir, 'daemon_settings.yml'))
self.assertEqual(c.api_connection_url, 'http://localhost:5279/lbryapi')
self.assertEqual(c.log_file_path, os.path.join(c.data_dir, 'lbrynet.log'))
@unittest.skipIf('win32' not in sys.platform, 'skipping windows only test')
def test_windows_defaults(self):
c = Config()
prefix = os.path.join(r"C:\Users", os.getlogin(), r"AppData\Local\lbry")
self.assertEqual(c.data_dir, os.path.join(prefix, 'lbrynet'))
self.assertEqual(c.wallet_dir, os.path.join(prefix, 'lbryum'))
self.assertEqual(c.download_dir, os.path.join(r"C:\Users", os.getlogin(), "Downloads"))
self.assertEqual(c.config, os.path.join(c.data_dir, 'daemon_settings.yml'))
self.assertEqual(c.api_connection_url, 'http://localhost:5279/lbryapi')
self.assertEqual(c.log_file_path, os.path.join(c.data_dir, 'lbrynet.log'))
@unittest.skipIf('linux' not in sys.platform, 'skipping linux only test') @unittest.skipIf('linux' not in sys.platform, 'skipping linux only test')
def test_linux_defaults(self): def test_linux_defaults(self):
c = Config() c = Config()