use databse to track blob disk space use and preserve own blobs
This commit is contained in:
parent
2d9e3e1847
commit
ef89c2e47a
8 changed files with 84 additions and 51 deletions
|
@ -1,5 +1,6 @@
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
import asyncio
|
import asyncio
|
||||||
import binascii
|
import binascii
|
||||||
import logging
|
import logging
|
||||||
|
@ -70,12 +71,15 @@ class AbstractBlob:
|
||||||
'writers',
|
'writers',
|
||||||
'verified',
|
'verified',
|
||||||
'writing',
|
'writing',
|
||||||
'readers'
|
'readers',
|
||||||
|
'is_mine',
|
||||||
|
'added_on',
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
||||||
blob_directory: typing.Optional[str] = None):
|
blob_directory: typing.Optional[str] = None, is_mine: bool = False,
|
||||||
|
added_on: typing.Optional[int] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_hash = blob_hash
|
self.blob_hash = blob_hash
|
||||||
self.length = length
|
self.length = length
|
||||||
|
@ -85,6 +89,8 @@ class AbstractBlob:
|
||||||
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
|
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
|
||||||
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
|
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
|
||||||
self.readers: typing.List[typing.BinaryIO] = []
|
self.readers: typing.List[typing.BinaryIO] = []
|
||||||
|
self.is_mine = is_mine
|
||||||
|
self.added_on = added_on or time.time()
|
||||||
|
|
||||||
if not is_valid_blobhash(blob_hash):
|
if not is_valid_blobhash(blob_hash):
|
||||||
raise InvalidBlobHashError(blob_hash)
|
raise InvalidBlobHashError(blob_hash)
|
||||||
|
|
|
@ -7,13 +7,19 @@ class BlobInfo:
|
||||||
'blob_num',
|
'blob_num',
|
||||||
'length',
|
'length',
|
||||||
'iv',
|
'iv',
|
||||||
|
'added_on',
|
||||||
|
'is_mine'
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None):
|
def __init__(
|
||||||
|
self, blob_num: int, length: int, iv: str,
|
||||||
|
blob_hash: typing.Optional[str] = None, added_on=None, is_mine=None):
|
||||||
self.blob_hash = blob_hash
|
self.blob_hash = blob_hash
|
||||||
self.blob_num = blob_num
|
self.blob_num = blob_num
|
||||||
self.length = length
|
self.length = length
|
||||||
self.iv = iv
|
self.iv = iv
|
||||||
|
self.added_on = added_on
|
||||||
|
self.is_mine = is_mine
|
||||||
|
|
||||||
def as_dict(self) -> typing.Dict:
|
def as_dict(self) -> typing.Dict:
|
||||||
d = {
|
d = {
|
||||||
|
|
|
@ -105,9 +105,13 @@ class BlobManager:
|
||||||
if isinstance(blob, BlobFile):
|
if isinstance(blob, BlobFile):
|
||||||
if blob.blob_hash not in self.completed_blob_hashes:
|
if blob.blob_hash not in self.completed_blob_hashes:
|
||||||
self.completed_blob_hashes.add(blob.blob_hash)
|
self.completed_blob_hashes.add(blob.blob_hash)
|
||||||
return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=True))
|
return self.loop.create_task(self.storage.add_blobs(
|
||||||
|
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=True)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=False))
|
return self.loop.create_task(self.storage.add_blobs(
|
||||||
|
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False)
|
||||||
|
)
|
||||||
|
|
||||||
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
||||||
"""Returns of the blobhashes_to_check, which are valid"""
|
"""Returns of the blobhashes_to_check, which are valid"""
|
||||||
|
|
|
@ -7,51 +7,38 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
class DiskSpaceManager:
|
class DiskSpaceManager:
|
||||||
|
|
||||||
def __init__(self, config, cleaning_interval=30 * 60):
|
def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60):
|
||||||
self.config = config
|
self.config = config
|
||||||
|
self.db = db
|
||||||
|
self.blob_manager = blob_manager
|
||||||
self.cleaning_interval = cleaning_interval
|
self.cleaning_interval = cleaning_interval
|
||||||
self.running = False
|
self.running = False
|
||||||
self.task = None
|
self.task = None
|
||||||
|
|
||||||
@property
|
async def get_space_used_bytes(self):
|
||||||
def space_used_bytes(self):
|
return await self.db.get_stored_blob_disk_usage()
|
||||||
used = 0
|
|
||||||
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
|
|
||||||
for item in os.scandir(data_dir):
|
|
||||||
if item.is_file:
|
|
||||||
used += item.stat().st_size
|
|
||||||
return used
|
|
||||||
|
|
||||||
@property
|
async def get_space_used_mb(self):
|
||||||
def space_used_mb(self):
|
return int(await self.get_space_used_bytes()/1024.0/1024.0)
|
||||||
return int(self.space_used_bytes/1024.0/1024.0)
|
|
||||||
|
|
||||||
def clean(self):
|
async def clean(self):
|
||||||
if not self.config.blob_storage_limit:
|
if not self.config.blob_storage_limit:
|
||||||
return 0
|
return 0
|
||||||
used = 0
|
delete = []
|
||||||
files = []
|
available = (self.config.blob_storage_limit*1024*1024) - await self.db.get_stored_blob_disk_usage()
|
||||||
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
|
for blob_hash, file_size in await self.db.get_stored_blobs(is_mine=False):
|
||||||
for file in os.scandir(data_dir):
|
|
||||||
if file.is_file:
|
|
||||||
file_stats = file.stat()
|
|
||||||
used += file_stats.st_size
|
|
||||||
files.append((file_stats.st_mtime, file_stats.st_size, file.path))
|
|
||||||
files.sort()
|
|
||||||
available = (self.config.blob_storage_limit*1024*1024) - used
|
|
||||||
cleaned = 0
|
|
||||||
for _, file_size, file in files:
|
|
||||||
available += file_size
|
available += file_size
|
||||||
if available > 0:
|
if available > 0:
|
||||||
break
|
break
|
||||||
os.remove(file)
|
delete.append(blob_hash)
|
||||||
cleaned += 1
|
if delete:
|
||||||
return cleaned
|
await self.blob_manager.delete_blobs(delete, delete_from_db=True)
|
||||||
|
return len(delete)
|
||||||
|
|
||||||
async def cleaning_loop(self):
|
async def cleaning_loop(self):
|
||||||
while self.running:
|
while self.running:
|
||||||
await asyncio.sleep(self.cleaning_interval)
|
await asyncio.sleep(self.cleaning_interval)
|
||||||
await asyncio.get_event_loop().run_in_executor(None, self.clean)
|
await self.clean()
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
|
@ -379,22 +379,28 @@ class FileManagerComponent(Component):
|
||||||
|
|
||||||
class DiskSpaceComponent(Component):
|
class DiskSpaceComponent(Component):
|
||||||
component_name = DISK_SPACE_COMPONENT
|
component_name = DISK_SPACE_COMPONENT
|
||||||
|
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.disk_space_manager = DiskSpaceManager(self.conf)
|
self.disk_space_manager: typing.Optional[DiskSpaceManager] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self) -> typing.Optional[DiskSpaceManager]:
|
def component(self) -> typing.Optional[DiskSpaceManager]:
|
||||||
return self.disk_space_manager
|
return self.disk_space_manager
|
||||||
|
|
||||||
async def get_status(self):
|
async def get_status(self):
|
||||||
|
if self.disk_space_manager:
|
||||||
return {
|
return {
|
||||||
'space_used': str(self.disk_space_manager.space_used_mb),
|
'space_used': str(await self.disk_space_manager.get_space_used_mb()),
|
||||||
'running': self.disk_space_manager.running,
|
'running': self.disk_space_manager.running,
|
||||||
}
|
}
|
||||||
|
return {'space_used': '0', 'running': False}
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
db = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||||
|
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||||
|
self.disk_space_manager = DiskSpaceManager(self.conf, db, blob_manager)
|
||||||
await self.disk_space_manager.start()
|
await self.disk_space_manager.start()
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
|
|
@ -5041,7 +5041,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
Returns:
|
Returns:
|
||||||
(bool) true if successful
|
(bool) true if successful
|
||||||
"""
|
"""
|
||||||
return self.disk_space_manager.clean()
|
return await self.disk_space_manager.clean()
|
||||||
|
|
||||||
@requires(FILE_MANAGER_COMPONENT)
|
@requires(FILE_MANAGER_COMPONENT)
|
||||||
async def jsonrpc_file_reflect(self, **kwargs):
|
async def jsonrpc_file_reflect(self, **kwargs):
|
||||||
|
|
|
@ -170,8 +170,8 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
|
||||||
def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
|
def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
|
||||||
# add all blobs, except the last one, which is empty
|
# add all blobs, except the last one, which is empty
|
||||||
transaction.executemany(
|
transaction.executemany(
|
||||||
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
|
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0)
|
((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0, blob.added_on, blob.is_mine)
|
||||||
for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob])
|
for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob])
|
||||||
).fetchall()
|
).fetchall()
|
||||||
# associate the blobs to the stream
|
# associate the blobs to the stream
|
||||||
|
@ -242,7 +242,9 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
should_announce integer not null default 0,
|
should_announce integer not null default 0,
|
||||||
status text not null,
|
status text not null,
|
||||||
last_announced_time integer,
|
last_announced_time integer,
|
||||||
single_announce integer
|
single_announce integer,
|
||||||
|
added_on integer not null,
|
||||||
|
is_mine integer not null default 0
|
||||||
);
|
);
|
||||||
|
|
||||||
create table if not exists stream (
|
create table if not exists stream (
|
||||||
|
@ -356,19 +358,19 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
|
|
||||||
# # # # # # # # # blob functions # # # # # # # # #
|
# # # # # # # # # blob functions # # # # # # # # #
|
||||||
|
|
||||||
async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int], finished=False):
|
async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int, int, int], finished=False):
|
||||||
def _add_blobs(transaction: sqlite3.Connection):
|
def _add_blobs(transaction: sqlite3.Connection):
|
||||||
transaction.executemany(
|
transaction.executemany(
|
||||||
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
|
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
(
|
(
|
||||||
(blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0)
|
(blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0, added_on, is_mine)
|
||||||
for blob_hash, length in blob_hashes_and_lengths
|
for blob_hash, length, added_on, is_mine in blob_hashes_and_lengths
|
||||||
)
|
)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
if finished:
|
if finished:
|
||||||
transaction.executemany(
|
transaction.executemany(
|
||||||
"update blob set status='finished' where blob.blob_hash=?", (
|
"update blob set status='finished' where blob.blob_hash=?", (
|
||||||
(blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths
|
(blob_hash, ) for blob_hash, _, _, _ in blob_hashes_and_lengths
|
||||||
)
|
)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
return await self.db.run(_add_blobs)
|
return await self.db.run(_add_blobs)
|
||||||
|
@ -435,6 +437,25 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def get_all_blob_hashes(self):
|
def get_all_blob_hashes(self):
|
||||||
return self.run_and_return_list("select blob_hash from blob")
|
return self.run_and_return_list("select blob_hash from blob")
|
||||||
|
|
||||||
|
async def get_stored_blobs(self, is_mine):
|
||||||
|
return await self.db.execute_fetchall(
|
||||||
|
"select blob_hash, blob_length from blob where is_mine=? order by added_on", (is_mine,)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_stored_blob_disk_usage(self, is_mine=None):
|
||||||
|
if is_mine is None:
|
||||||
|
sql, args = "select coalesce(sum(blob_length), 0) from blob", ()
|
||||||
|
else:
|
||||||
|
sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,)
|
||||||
|
return (await self.db.execute_fetchone(sql, args))[0]
|
||||||
|
|
||||||
|
async def update_blob_ownership(self, stream_hash, is_mine: bool):
|
||||||
|
await self.db.execute_fetchall(
|
||||||
|
"update blob set is_mine = ? where blob_hash in ("
|
||||||
|
" select blob_hash from stream_blob where stream_hash = ?"
|
||||||
|
")", (1 if is_mine else 0, stream_hash)
|
||||||
|
)
|
||||||
|
|
||||||
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
|
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
|
||||||
def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]:
|
def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]:
|
||||||
finished_blob_hashes = tuple(
|
finished_blob_hashes = tuple(
|
||||||
|
|
|
@ -522,9 +522,12 @@ class DiskSpaceManagement(CommandTestCase):
|
||||||
self.assertEqual(True, status['disk_space']['running'])
|
self.assertEqual(True, status['disk_space']['running'])
|
||||||
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
||||||
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||||
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
|
stream = await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||||
|
stream_hash = stream['outputs'][0]['value']['source']['hash']
|
||||||
|
await self.daemon.storage.update_blob_ownership(stream_hash, False)
|
||||||
|
self.assertEqual('7', (await self.status())['disk_space']['space_used'])
|
||||||
await self.blob_clean()
|
await self.blob_clean()
|
||||||
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
|
self.assertEqual('7', (await self.status())['disk_space']['space_used'])
|
||||||
self.daemon.conf.blob_storage_limit = 3
|
self.daemon.conf.blob_storage_limit = 3
|
||||||
await self.blob_clean()
|
await self.blob_clean()
|
||||||
self.assertEqual('3', (await self.status())['disk_space']['space_used'])
|
self.assertEqual('2', (await self.status())['disk_space']['space_used'])
|
||||||
|
|
Loading…
Reference in a new issue