From 07a78cf73d2698f4dacc9c1a26d89be95001eff2 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Wed, 8 Sep 2021 10:55:21 -0400 Subject: [PATCH] use databse to track blob disk space use and preserve own blobs --- lbry/blob/blob_file.py | 10 ++++- lbry/blob/blob_info.py | 8 +++- lbry/blob/blob_manager.py | 8 +++- lbry/blob/disk_space_manager.py | 45 +++++++------------ lbry/extras/daemon/components.py | 16 ++++--- lbry/extras/daemon/daemon.py | 2 +- lbry/extras/daemon/storage.py | 37 +++++++++++---- .../datanetwork/test_file_commands.py | 9 ++-- 8 files changed, 84 insertions(+), 51 deletions(-) diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index d702d0a06..cda667561 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -1,5 +1,6 @@ import os import re +import time import asyncio import binascii import logging @@ -70,12 +71,15 @@ class AbstractBlob: 'writers', 'verified', 'writing', - 'readers' + 'readers', + 'is_mine', + 'added_on', ] def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, - blob_directory: typing.Optional[str] = None): + blob_directory: typing.Optional[str] = None, is_mine: bool = False, + added_on: typing.Optional[int] = None): self.loop = loop self.blob_hash = blob_hash self.length = length @@ -85,6 +89,8 @@ class AbstractBlob: self.verified: asyncio.Event = asyncio.Event(loop=self.loop) self.writing: asyncio.Event = asyncio.Event(loop=self.loop) self.readers: typing.List[typing.BinaryIO] = [] + self.is_mine = is_mine + self.added_on = added_on or time.time() if not is_valid_blobhash(blob_hash): raise InvalidBlobHashError(blob_hash) diff --git a/lbry/blob/blob_info.py b/lbry/blob/blob_info.py index febfdcb65..408b6047b 100644 --- a/lbry/blob/blob_info.py +++ b/lbry/blob/blob_info.py @@ -7,13 +7,19 @@ class BlobInfo: 'blob_num', 'length', 'iv', + 'added_on', + 'is_mine' ] - def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None): + def __init__( + self, blob_num: int, length: int, iv: str, + blob_hash: typing.Optional[str] = None, added_on=None, is_mine=None): self.blob_hash = blob_hash self.blob_num = blob_num self.length = length self.iv = iv + self.added_on = added_on + self.is_mine = is_mine def as_dict(self) -> typing.Dict: d = { diff --git a/lbry/blob/blob_manager.py b/lbry/blob/blob_manager.py index 4a7e54740..c99a017b9 100644 --- a/lbry/blob/blob_manager.py +++ b/lbry/blob/blob_manager.py @@ -105,9 +105,13 @@ class BlobManager: if isinstance(blob, BlobFile): if blob.blob_hash not in self.completed_blob_hashes: self.completed_blob_hashes.add(blob.blob_hash) - return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=True)) + return self.loop.create_task(self.storage.add_blobs( + (blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=True) + ) else: - return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=False)) + return self.loop.create_task(self.storage.add_blobs( + (blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False) + ) def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: """Returns of the blobhashes_to_check, which are valid""" diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index f42a43be4..2a19180dc 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -7,51 +7,38 @@ log = logging.getLogger(__name__) class DiskSpaceManager: - def __init__(self, config, cleaning_interval=30 * 60): + def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60): self.config = config + self.db = db + self.blob_manager = blob_manager self.cleaning_interval = cleaning_interval self.running = False self.task = None - @property - def space_used_bytes(self): - used = 0 - data_dir = os.path.join(self.config.data_dir, 'blobfiles') - for item in os.scandir(data_dir): - if item.is_file: - used += item.stat().st_size - return used + async def get_space_used_bytes(self): + return await self.db.get_stored_blob_disk_usage() - @property - def space_used_mb(self): - return int(self.space_used_bytes/1024.0/1024.0) + async def get_space_used_mb(self): + return int(await self.get_space_used_bytes()/1024.0/1024.0) - def clean(self): + async def clean(self): if not self.config.blob_storage_limit: return 0 - used = 0 - files = [] - data_dir = os.path.join(self.config.data_dir, 'blobfiles') - for file in os.scandir(data_dir): - if file.is_file: - file_stats = file.stat() - used += file_stats.st_size - files.append((file_stats.st_mtime, file_stats.st_size, file.path)) - files.sort() - available = (self.config.blob_storage_limit*1024*1024) - used - cleaned = 0 - for _, file_size, file in files: + delete = [] + available = (self.config.blob_storage_limit*1024*1024) - await self.db.get_stored_blob_disk_usage() + for blob_hash, file_size in await self.db.get_stored_blobs(is_mine=False): available += file_size if available > 0: break - os.remove(file) - cleaned += 1 - return cleaned + delete.append(blob_hash) + if delete: + await self.blob_manager.delete_blobs(delete, delete_from_db=True) + return len(delete) async def cleaning_loop(self): while self.running: await asyncio.sleep(self.cleaning_interval) - await asyncio.get_event_loop().run_in_executor(None, self.clean) + await self.clean() async def start(self): self.running = True diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index a383b9ea5..18d846375 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -379,22 +379,28 @@ class FileManagerComponent(Component): class DiskSpaceComponent(Component): component_name = DISK_SPACE_COMPONENT + depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) - self.disk_space_manager = DiskSpaceManager(self.conf) + self.disk_space_manager: typing.Optional[DiskSpaceManager] = None @property def component(self) -> typing.Optional[DiskSpaceManager]: return self.disk_space_manager async def get_status(self): - return { - 'space_used': str(self.disk_space_manager.space_used_mb), - 'running': self.disk_space_manager.running, - } + if self.disk_space_manager: + return { + 'space_used': str(await self.disk_space_manager.get_space_used_mb()), + 'running': self.disk_space_manager.running, + } + return {'space_used': '0', 'running': False} async def start(self): + db = self.component_manager.get_component(DATABASE_COMPONENT) + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + self.disk_space_manager = DiskSpaceManager(self.conf, db, blob_manager) await self.disk_space_manager.start() async def stop(self): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 15374fa23..d0220cff6 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -5041,7 +5041,7 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (bool) true if successful """ - return self.disk_space_manager.clean() + return await self.disk_space_manager.clean() @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_reflect(self, **kwargs): diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 38e6caf68..83dc3b0eb 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -170,8 +170,8 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'): # add all blobs, except the last one, which is empty transaction.executemany( - "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", - ((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0) + "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0, blob.added_on, blob.is_mine) for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob]) ).fetchall() # associate the blobs to the stream @@ -242,7 +242,9 @@ class SQLiteStorage(SQLiteMixin): should_announce integer not null default 0, status text not null, last_announced_time integer, - single_announce integer + single_announce integer, + added_on integer not null, + is_mine integer not null default 0 ); create table if not exists stream ( @@ -356,19 +358,19 @@ class SQLiteStorage(SQLiteMixin): # # # # # # # # # blob functions # # # # # # # # # - async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int], finished=False): + async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int, int, int], finished=False): def _add_blobs(transaction: sqlite3.Connection): transaction.executemany( - "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", + "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( - (blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0) - for blob_hash, length in blob_hashes_and_lengths + (blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0, added_on, is_mine) + for blob_hash, length, added_on, is_mine in blob_hashes_and_lengths ) ).fetchall() if finished: transaction.executemany( "update blob set status='finished' where blob.blob_hash=?", ( - (blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths + (blob_hash, ) for blob_hash, _, _, _ in blob_hashes_and_lengths ) ).fetchall() return await self.db.run(_add_blobs) @@ -435,6 +437,25 @@ class SQLiteStorage(SQLiteMixin): def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") + async def get_stored_blobs(self, is_mine): + return await self.db.execute_fetchall( + "select blob_hash, blob_length from blob where is_mine=? order by added_on", (is_mine,) + ) + + async def get_stored_blob_disk_usage(self, is_mine=None): + if is_mine is None: + sql, args = "select coalesce(sum(blob_length), 0) from blob", () + else: + sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,) + return (await self.db.execute_fetchone(sql, args))[0] + + async def update_blob_ownership(self, stream_hash, is_mine: bool): + await self.db.execute_fetchall( + "update blob set is_mine = ? where blob_hash in (" + " select blob_hash from stream_blob where stream_hash = ?" + ")", (1 if is_mine else 0, stream_hash) + ) + def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]: def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]: finished_blob_hashes = tuple( diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index dccaca324..70f9c4b2c 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -522,9 +522,12 @@ class DiskSpaceManagement(CommandTestCase): self.assertEqual(True, status['disk_space']['running']) await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode()) await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode()) - self.assertEqual('5', (await self.status())['disk_space']['space_used']) + stream = await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode()) + stream_hash = stream['outputs'][0]['value']['source']['hash'] + await self.daemon.storage.update_blob_ownership(stream_hash, False) + self.assertEqual('7', (await self.status())['disk_space']['space_used']) await self.blob_clean() - self.assertEqual('5', (await self.status())['disk_space']['space_used']) + self.assertEqual('7', (await self.status())['disk_space']['space_used']) self.daemon.conf.blob_storage_limit = 3 await self.blob_clean() - self.assertEqual('3', (await self.status())['disk_space']['space_used']) + self.assertEqual('2', (await self.status())['disk_space']['space_used'])