use databse to track blob disk space use and preserve own blobs

This commit is contained in:
Lex Berezhny 2021-09-08 10:55:21 -04:00 committed by Victor Shyba
parent 93ac2e3bc9
commit 07a78cf73d
8 changed files with 84 additions and 51 deletions

View file

@ -1,5 +1,6 @@
import os
import re
import time
import asyncio
import binascii
import logging
@ -70,12 +71,15 @@ class AbstractBlob:
'writers',
'verified',
'writing',
'readers'
'readers',
'is_mine',
'added_on',
]
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None):
blob_directory: typing.Optional[str] = None, is_mine: bool = False,
added_on: typing.Optional[int] = None):
self.loop = loop
self.blob_hash = blob_hash
self.length = length
@ -85,6 +89,8 @@ class AbstractBlob:
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
self.readers: typing.List[typing.BinaryIO] = []
self.is_mine = is_mine
self.added_on = added_on or time.time()
if not is_valid_blobhash(blob_hash):
raise InvalidBlobHashError(blob_hash)

View file

@ -7,13 +7,19 @@ class BlobInfo:
'blob_num',
'length',
'iv',
'added_on',
'is_mine'
]
def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None):
def __init__(
self, blob_num: int, length: int, iv: str,
blob_hash: typing.Optional[str] = None, added_on=None, is_mine=None):
self.blob_hash = blob_hash
self.blob_num = blob_num
self.length = length
self.iv = iv
self.added_on = added_on
self.is_mine = is_mine
def as_dict(self) -> typing.Dict:
d = {

View file

@ -105,9 +105,13 @@ class BlobManager:
if isinstance(blob, BlobFile):
if blob.blob_hash not in self.completed_blob_hashes:
self.completed_blob_hashes.add(blob.blob_hash)
return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=True))
return self.loop.create_task(self.storage.add_blobs(
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=True)
)
else:
return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=False))
return self.loop.create_task(self.storage.add_blobs(
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False)
)
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
"""Returns of the blobhashes_to_check, which are valid"""

View file

@ -7,51 +7,38 @@ log = logging.getLogger(__name__)
class DiskSpaceManager:
def __init__(self, config, cleaning_interval=30 * 60):
def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60):
self.config = config
self.db = db
self.blob_manager = blob_manager
self.cleaning_interval = cleaning_interval
self.running = False
self.task = None
@property
def space_used_bytes(self):
used = 0
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
for item in os.scandir(data_dir):
if item.is_file:
used += item.stat().st_size
return used
async def get_space_used_bytes(self):
return await self.db.get_stored_blob_disk_usage()
@property
def space_used_mb(self):
return int(self.space_used_bytes/1024.0/1024.0)
async def get_space_used_mb(self):
return int(await self.get_space_used_bytes()/1024.0/1024.0)
def clean(self):
async def clean(self):
if not self.config.blob_storage_limit:
return 0
used = 0
files = []
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
for file in os.scandir(data_dir):
if file.is_file:
file_stats = file.stat()
used += file_stats.st_size
files.append((file_stats.st_mtime, file_stats.st_size, file.path))
files.sort()
available = (self.config.blob_storage_limit*1024*1024) - used
cleaned = 0
for _, file_size, file in files:
delete = []
available = (self.config.blob_storage_limit*1024*1024) - await self.db.get_stored_blob_disk_usage()
for blob_hash, file_size in await self.db.get_stored_blobs(is_mine=False):
available += file_size
if available > 0:
break
os.remove(file)
cleaned += 1
return cleaned
delete.append(blob_hash)
if delete:
await self.blob_manager.delete_blobs(delete, delete_from_db=True)
return len(delete)
async def cleaning_loop(self):
while self.running:
await asyncio.sleep(self.cleaning_interval)
await asyncio.get_event_loop().run_in_executor(None, self.clean)
await self.clean()
async def start(self):
self.running = True

View file

@ -379,22 +379,28 @@ class FileManagerComponent(Component):
class DiskSpaceComponent(Component):
component_name = DISK_SPACE_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
self.disk_space_manager = DiskSpaceManager(self.conf)
self.disk_space_manager: typing.Optional[DiskSpaceManager] = None
@property
def component(self) -> typing.Optional[DiskSpaceManager]:
return self.disk_space_manager
async def get_status(self):
return {
'space_used': str(self.disk_space_manager.space_used_mb),
'running': self.disk_space_manager.running,
}
if self.disk_space_manager:
return {
'space_used': str(await self.disk_space_manager.get_space_used_mb()),
'running': self.disk_space_manager.running,
}
return {'space_used': '0', 'running': False}
async def start(self):
db = self.component_manager.get_component(DATABASE_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
self.disk_space_manager = DiskSpaceManager(self.conf, db, blob_manager)
await self.disk_space_manager.start()
async def stop(self):

View file

@ -5041,7 +5041,7 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(bool) true if successful
"""
return self.disk_space_manager.clean()
return await self.disk_space_manager.clean()
@requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_reflect(self, **kwargs):

View file

@ -170,8 +170,8 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
# add all blobs, except the last one, which is empty
transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0)
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0, blob.added_on, blob.is_mine)
for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob])
).fetchall()
# associate the blobs to the stream
@ -242,7 +242,9 @@ class SQLiteStorage(SQLiteMixin):
should_announce integer not null default 0,
status text not null,
last_announced_time integer,
single_announce integer
single_announce integer,
added_on integer not null,
is_mine integer not null default 0
);
create table if not exists stream (
@ -356,19 +358,19 @@ class SQLiteStorage(SQLiteMixin):
# # # # # # # # # blob functions # # # # # # # # #
async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int], finished=False):
async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int, int, int], finished=False):
def _add_blobs(transaction: sqlite3.Connection):
transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
(blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0)
for blob_hash, length in blob_hashes_and_lengths
(blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0, added_on, is_mine)
for blob_hash, length, added_on, is_mine in blob_hashes_and_lengths
)
).fetchall()
if finished:
transaction.executemany(
"update blob set status='finished' where blob.blob_hash=?", (
(blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths
(blob_hash, ) for blob_hash, _, _, _ in blob_hashes_and_lengths
)
).fetchall()
return await self.db.run(_add_blobs)
@ -435,6 +437,25 @@ class SQLiteStorage(SQLiteMixin):
def get_all_blob_hashes(self):
return self.run_and_return_list("select blob_hash from blob")
async def get_stored_blobs(self, is_mine):
return await self.db.execute_fetchall(
"select blob_hash, blob_length from blob where is_mine=? order by added_on", (is_mine,)
)
async def get_stored_blob_disk_usage(self, is_mine=None):
if is_mine is None:
sql, args = "select coalesce(sum(blob_length), 0) from blob", ()
else:
sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,)
return (await self.db.execute_fetchone(sql, args))[0]
async def update_blob_ownership(self, stream_hash, is_mine: bool):
await self.db.execute_fetchall(
"update blob set is_mine = ? where blob_hash in ("
" select blob_hash from stream_blob where stream_hash = ?"
")", (1 if is_mine else 0, stream_hash)
)
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]:
finished_blob_hashes = tuple(

View file

@ -522,9 +522,12 @@ class DiskSpaceManagement(CommandTestCase):
self.assertEqual(True, status['disk_space']['running'])
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
stream = await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode())
stream_hash = stream['outputs'][0]['value']['source']['hash']
await self.daemon.storage.update_blob_ownership(stream_hash, False)
self.assertEqual('7', (await self.status())['disk_space']['space_used'])
await self.blob_clean()
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
self.assertEqual('7', (await self.status())['disk_space']['space_used'])
self.daemon.conf.blob_storage_limit = 3
await self.blob_clean()
self.assertEqual('3', (await self.status())['disk_space']['space_used'])
self.assertEqual('2', (await self.status())['disk_space']['space_used'])