diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index cda667561..42ed3a856 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -72,14 +72,15 @@ class AbstractBlob: 'verified', 'writing', 'readers', - 'is_mine', 'added_on', + 'is_mine', ] - def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, - blob_directory: typing.Optional[str] = None, is_mine: bool = False, - added_on: typing.Optional[int] = None): + def __init__( + self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, + blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False, + ): self.loop = loop self.blob_hash = blob_hash self.length = length @@ -89,8 +90,8 @@ class AbstractBlob: self.verified: asyncio.Event = asyncio.Event(loop=self.loop) self.writing: asyncio.Event = asyncio.Event(loop=self.loop) self.readers: typing.List[typing.BinaryIO] = [] - self.is_mine = is_mine self.added_on = added_on or time.time() + self.is_mine = is_mine if not is_valid_blobhash(blob_hash): raise InvalidBlobHashError(blob_hash) @@ -186,20 +187,21 @@ class AbstractBlob: @classmethod async def create_from_unencrypted( - cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, - unencrypted: bytes, blob_num: int, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo: + cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None, + ) -> BlobInfo: """ Create an encrypted BlobFile from plaintext bytes """ blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted) length = len(blob_bytes) - blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir) + blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir, added_on, is_mine) writer = blob.get_blob_writer() writer.write(blob_bytes) await blob.verified.wait() - return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash) + return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash, added_on, is_mine) def save_verified_blob(self, verified_bytes: bytes): if self.verified.is_set(): @@ -254,11 +256,13 @@ class BlobBuffer(AbstractBlob): """ An in-memory only blob """ - def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, - blob_directory: typing.Optional[str] = None): + def __init__( + self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, + blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False + ): self._verified_bytes: typing.Optional[BytesIO] = None - super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) + super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine) @contextlib.contextmanager def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]: @@ -295,10 +299,12 @@ class BlobFile(AbstractBlob): """ A blob existing on the local file system """ - def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, - blob_directory: typing.Optional[str] = None): - super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) + def __init__( + self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, + blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False + ): + super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine) if not blob_directory or not os.path.isdir(blob_directory): raise OSError(f"invalid blob directory '{blob_directory}'") self.file_path = os.path.join(self.blob_directory, self.blob_hash) @@ -349,12 +355,12 @@ class BlobFile(AbstractBlob): @classmethod async def create_from_unencrypted( - cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, - unencrypted: bytes, blob_num: int, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], - asyncio.Task]] = None) -> BlobInfo: + cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None + ) -> BlobInfo: if not blob_dir or not os.path.isdir(blob_dir): raise OSError(f"cannot create blob in directory: '{blob_dir}'") return await super().create_from_unencrypted( - loop, blob_dir, key, iv, unencrypted, blob_num, blob_completed_callback + loop, blob_dir, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback ) diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 2a19180dc..4ed445129 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -25,12 +25,14 @@ class DiskSpaceManager: if not self.config.blob_storage_limit: return 0 delete = [] - available = (self.config.blob_storage_limit*1024*1024) - await self.db.get_stored_blob_disk_usage() - for blob_hash, file_size in await self.db.get_stored_blobs(is_mine=False): + available = (self.config.blob_storage_limit*1024*1024) - await self.get_space_used_bytes() + if available > 0: + return 0 + for blob_hash, file_size, added_on in await self.db.get_stored_blobs(is_mine=False): + delete.append(blob_hash) available += file_size if available > 0: break - delete.append(blob_hash) if delete: await self.blob_manager.delete_blobs(delete, delete_from_db=True) return len(delete) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 18d846375..e38d4835f 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -61,7 +61,7 @@ class DatabaseComponent(Component): @staticmethod def get_current_db_revision(): - return 14 + return 15 @property def revision_filename(self): diff --git a/lbry/extras/daemon/migrator/dbmigrator.py b/lbry/extras/daemon/migrator/dbmigrator.py index 726cc1974..a2b1211e7 100644 --- a/lbry/extras/daemon/migrator/dbmigrator.py +++ b/lbry/extras/daemon/migrator/dbmigrator.py @@ -35,6 +35,8 @@ def migrate_db(conf, start, end): from .migrate12to13 import do_migration elif current == 13: from .migrate13to14 import do_migration + elif current == 14: + from .migrate14to15 import do_migration else: raise Exception(f"DB migration of version {current} to {current+1} is not available") try: diff --git a/lbry/extras/daemon/migrator/migrate14to15.py b/lbry/extras/daemon/migrator/migrate14to15.py new file mode 100644 index 000000000..a0b7257e3 --- /dev/null +++ b/lbry/extras/daemon/migrator/migrate14to15.py @@ -0,0 +1,16 @@ +import os +import sqlite3 + + +def do_migration(conf): + db_path = os.path.join(conf.data_dir, "lbrynet.sqlite") + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + + cursor.executescript(""" + alter table blob add column added_on integer not null default 0; + alter table blob add column is_mine integer not null default 0; + """) + + connection.commit() + connection.close() diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 83dc3b0eb..4b51074ec 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -437,23 +437,27 @@ class SQLiteStorage(SQLiteMixin): def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") - async def get_stored_blobs(self, is_mine): + async def get_stored_blobs(self, is_mine: bool): + is_mine = 1 if is_mine else 0 return await self.db.execute_fetchall( - "select blob_hash, blob_length from blob where is_mine=? order by added_on", (is_mine,) + "select blob_hash, blob_length, added_on from blob where is_mine=? order by added_on asc", + (is_mine,) ) - async def get_stored_blob_disk_usage(self, is_mine=None): + async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None): if is_mine is None: sql, args = "select coalesce(sum(blob_length), 0) from blob", () else: + is_mine = 1 if is_mine else 0 sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,) return (await self.db.execute_fetchone(sql, args))[0] - async def update_blob_ownership(self, stream_hash, is_mine: bool): + async def update_blob_ownership(self, sd_hash, is_mine: bool): + is_mine = 1 if is_mine else 0 await self.db.execute_fetchall( "update blob set is_mine = ? where blob_hash in (" - " select blob_hash from stream_blob where stream_hash = ?" - ")", (1 if is_mine else 0, stream_hash) + " select blob_hash from blob natural join stream_blob natural join stream where sd_hash = ?" + ")", (is_mine, sd_hash) ) def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]: diff --git a/lbry/stream/descriptor.py b/lbry/stream/descriptor.py index fadabf29c..bd0cd2d97 100644 --- a/lbry/stream/descriptor.py +++ b/lbry/stream/descriptor.py @@ -4,6 +4,7 @@ import binascii import logging import typing import asyncio +import time import re from collections import OrderedDict from cryptography.hazmat.primitives.ciphers.algorithms import AES @@ -252,14 +253,17 @@ class StreamDescriptor: iv_generator = iv_generator or random_iv_generator() key = key or os.urandom(AES.block_size // 8) blob_num = -1 + added_on = time.time() async for blob_bytes in file_reader(file_path): blob_num += 1 blob_info = await BlobFile.create_from_unencrypted( - loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback + loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, added_on, True, blob_completed_callback ) blobs.append(blob_info) blobs.append( - BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode())) # add the stream terminator + # add the stream terminator + BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), None, added_on, True) + ) file_name = os.path.basename(file_path) suggested_file_name = sanitize_file_name(file_name) descriptor = cls( diff --git a/lbry/testcase.py b/lbry/testcase.py index 8e022399b..70fe84159 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -649,6 +649,9 @@ class CommandTestCase(IntegrationTestCase): async def transaction_list(self, *args, **kwargs): return (await self.out(self.daemon.jsonrpc_transaction_list(*args, **kwargs)))['items'] + async def blob_list(self, *args, **kwargs): + return (await self.out(self.daemon.jsonrpc_blob_list(*args, **kwargs)))['items'] + @staticmethod def get_claim_id(tx): return tx['outputs'][0]['claim_id'] diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 70f9c4b2c..994ad3506 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -515,19 +515,46 @@ class FileCommands(CommandTestCase): class DiskSpaceManagement(CommandTestCase): + async def get_referenced_blobs(self, tx): + sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] + stream_hash = await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) + return tx['outputs'][0]['value']['source']['sd_hash'], set(await self.blob_list( + stream_hash=stream_hash + )) + async def test_file_management(self): status = await self.status() self.assertIn('disk_space', status) self.assertEqual('0', status['disk_space']['space_used']) self.assertEqual(True, status['disk_space']['running']) - await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode()) - await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode()) - stream = await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode()) - stream_hash = stream['outputs'][0]['value']['source']['hash'] - await self.daemon.storage.update_blob_ownership(stream_hash, False) - self.assertEqual('7', (await self.status())['disk_space']['space_used']) + sd_hash1, blobs1 = await self.get_referenced_blobs( + await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode()) + ) + sd_hash2, blobs2 = await self.get_referenced_blobs( + await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode()) + ) + sd_hash3, blobs3 = await self.get_referenced_blobs( + await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode()) + ) + sd_hash4, blobs4 = await self.get_referenced_blobs( + await self.stream_create('foo4', '0.01', data=('0' * 2 * 1024 * 1024).encode()) + ) + + await self.daemon.storage.update_blob_ownership(sd_hash1, False) + await self.daemon.storage.update_blob_ownership(sd_hash3, False) + await self.daemon.storage.update_blob_ownership(sd_hash4, False) + + self.assertEqual('9', (await self.status())['disk_space']['space_used']) + self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) + await self.blob_clean() - self.assertEqual('7', (await self.status())['disk_space']['space_used']) - self.daemon.conf.blob_storage_limit = 3 + + self.assertEqual('9', (await self.status())['disk_space']['space_used']) + self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) + + self.daemon.conf.blob_storage_limit = 5 await self.blob_clean() - self.assertEqual('2', (await self.status())['disk_space']['space_used']) + + self.assertEqual('4', (await self.status())['disk_space']['space_used']) + blobs = set(await self.blob_list()) + self.assertEqual(blobs2 | blobs4, set(await self.blob_list()))