Merge pull request #3417 from lbryio/preserve_own_blobs

use database to track blob disk space use and preserve own blobs
This commit is contained in:
Lex Berezhny 2021-09-20 11:32:59 -04:00 committed by GitHub
commit 5baeda9ff1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 211 additions and 131 deletions

View file

@ -68,7 +68,7 @@ jobs:
COVERALLS_PARALLEL: true COVERALLS_PARALLEL: true
run: | run: |
pip install coveralls pip install coveralls
coveralls --service=github #coveralls --service=github
tests-integration: tests-integration:
name: "tests / integration" name: "tests / integration"
@ -114,7 +114,7 @@ jobs:
COVERALLS_PARALLEL: true COVERALLS_PARALLEL: true
run: | run: |
coverage combine tests coverage combine tests
coveralls --service=github #coveralls --service=github
coverage: coverage:
@ -126,7 +126,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: | run: |
pip install coveralls pip install coveralls
coveralls --service=github --finish #coveralls --service=github --finish
build: build:
needs: ["lint", "tests-unit", "tests-integration"] needs: ["lint", "tests-unit", "tests-integration"]

View file

@ -1,5 +1,6 @@
import os import os
import re import re
import time
import asyncio import asyncio
import binascii import binascii
import logging import logging
@ -70,12 +71,16 @@ class AbstractBlob:
'writers', 'writers',
'verified', 'verified',
'writing', 'writing',
'readers' 'readers',
'added_on',
'is_mine',
] ]
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, def __init__(
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_directory: typing.Optional[str] = None): blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False,
):
self.loop = loop self.loop = loop
self.blob_hash = blob_hash self.blob_hash = blob_hash
self.length = length self.length = length
@ -85,6 +90,8 @@ class AbstractBlob:
self.verified: asyncio.Event = asyncio.Event(loop=self.loop) self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
self.writing: asyncio.Event = asyncio.Event(loop=self.loop) self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
self.readers: typing.List[typing.BinaryIO] = [] self.readers: typing.List[typing.BinaryIO] = []
self.added_on = added_on or time.time()
self.is_mine = is_mine
if not is_valid_blobhash(blob_hash): if not is_valid_blobhash(blob_hash):
raise InvalidBlobHashError(blob_hash) raise InvalidBlobHashError(blob_hash)
@ -180,20 +187,21 @@ class AbstractBlob:
@classmethod @classmethod
async def create_from_unencrypted( async def create_from_unencrypted(
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int, unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo: blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
) -> BlobInfo:
""" """
Create an encrypted BlobFile from plaintext bytes Create an encrypted BlobFile from plaintext bytes
""" """
blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted) blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
length = len(blob_bytes) length = len(blob_bytes)
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir) blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir, added_on, is_mine)
writer = blob.get_blob_writer() writer = blob.get_blob_writer()
writer.write(blob_bytes) writer.write(blob_bytes)
await blob.verified.wait() await blob.verified.wait()
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash) return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash, added_on, is_mine)
def save_verified_blob(self, verified_bytes: bytes): def save_verified_blob(self, verified_bytes: bytes):
if self.verified.is_set(): if self.verified.is_set():
@ -248,11 +256,13 @@ class BlobBuffer(AbstractBlob):
""" """
An in-memory only blob An in-memory only blob
""" """
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, def __init__(
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_directory: typing.Optional[str] = None): blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
):
self._verified_bytes: typing.Optional[BytesIO] = None self._verified_bytes: typing.Optional[BytesIO] = None
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
@contextlib.contextmanager @contextlib.contextmanager
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]: def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
@ -289,10 +299,12 @@ class BlobFile(AbstractBlob):
""" """
A blob existing on the local file system A blob existing on the local file system
""" """
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, def __init__(
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_directory: typing.Optional[str] = None): blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
):
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
if not blob_directory or not os.path.isdir(blob_directory): if not blob_directory or not os.path.isdir(blob_directory):
raise OSError(f"invalid blob directory '{blob_directory}'") raise OSError(f"invalid blob directory '{blob_directory}'")
self.file_path = os.path.join(self.blob_directory, self.blob_hash) self.file_path = os.path.join(self.blob_directory, self.blob_hash)
@ -343,12 +355,12 @@ class BlobFile(AbstractBlob):
@classmethod @classmethod
async def create_from_unencrypted( async def create_from_unencrypted(
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int, unencrypted: bytes, blob_num: int, added_on: float, is_mine: bool,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None
asyncio.Task]] = None) -> BlobInfo: ) -> BlobInfo:
if not blob_dir or not os.path.isdir(blob_dir): if not blob_dir or not os.path.isdir(blob_dir):
raise OSError(f"cannot create blob in directory: '{blob_dir}'") raise OSError(f"cannot create blob in directory: '{blob_dir}'")
return await super().create_from_unencrypted( return await super().create_from_unencrypted(
loop, blob_dir, key, iv, unencrypted, blob_num, blob_completed_callback loop, blob_dir, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback
) )

View file

@ -7,13 +7,19 @@ class BlobInfo:
'blob_num', 'blob_num',
'length', 'length',
'iv', 'iv',
'added_on',
'is_mine'
] ]
def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None): def __init__(
self, blob_num: int, length: int, iv: str,
blob_hash: typing.Optional[str] = None, added_on=0, is_mine=False):
self.blob_hash = blob_hash self.blob_hash = blob_hash
self.blob_num = blob_num self.blob_num = blob_num
self.length = length self.length = length
self.iv = iv self.iv = iv
self.added_on = added_on
self.is_mine = is_mine
def as_dict(self) -> typing.Dict: def as_dict(self) -> typing.Dict:
d = { d = {

View file

@ -36,30 +36,30 @@ class BlobManager:
self.config.blob_lru_cache_size) self.config.blob_lru_cache_size)
self.connection_manager = ConnectionManager(loop) self.connection_manager = ConnectionManager(loop)
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None): def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None, is_mine: bool = False):
if self.config.save_blobs or ( if self.config.save_blobs or (
is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))): is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))):
return BlobFile( return BlobFile(
self.loop, blob_hash, length, self.blob_completed, self.blob_dir self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
) )
return BlobBuffer( return BlobBuffer(
self.loop, blob_hash, length, self.blob_completed, self.blob_dir self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
) )
def get_blob(self, blob_hash, length: typing.Optional[int] = None): def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool = False):
if blob_hash in self.blobs: if blob_hash in self.blobs:
if self.config.save_blobs and isinstance(self.blobs[blob_hash], BlobBuffer): if self.config.save_blobs and isinstance(self.blobs[blob_hash], BlobBuffer):
buffer = self.blobs.pop(blob_hash) buffer = self.blobs.pop(blob_hash)
if blob_hash in self.completed_blob_hashes: if blob_hash in self.completed_blob_hashes:
self.completed_blob_hashes.remove(blob_hash) self.completed_blob_hashes.remove(blob_hash)
self.blobs[blob_hash] = self._get_blob(blob_hash, length) self.blobs[blob_hash] = self._get_blob(blob_hash, length, is_mine)
if buffer.is_readable(): if buffer.is_readable():
with buffer.reader_context() as reader: with buffer.reader_context() as reader:
self.blobs[blob_hash].write_blob(reader.read()) self.blobs[blob_hash].write_blob(reader.read())
if length and self.blobs[blob_hash].length is None: if length and self.blobs[blob_hash].length is None:
self.blobs[blob_hash].set_length(length) self.blobs[blob_hash].set_length(length)
else: else:
self.blobs[blob_hash] = self._get_blob(blob_hash, length) self.blobs[blob_hash] = self._get_blob(blob_hash, length, is_mine)
return self.blobs[blob_hash] return self.blobs[blob_hash]
def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool: def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool:
@ -105,9 +105,13 @@ class BlobManager:
if isinstance(blob, BlobFile): if isinstance(blob, BlobFile):
if blob.blob_hash not in self.completed_blob_hashes: if blob.blob_hash not in self.completed_blob_hashes:
self.completed_blob_hashes.add(blob.blob_hash) self.completed_blob_hashes.add(blob.blob_hash)
return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=True)) return self.loop.create_task(self.storage.add_blobs(
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=True)
)
else: else:
return self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=False)) return self.loop.create_task(self.storage.add_blobs(
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False)
)
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
"""Returns of the blobhashes_to_check, which are valid""" """Returns of the blobhashes_to_check, which are valid"""

View file

@ -1,4 +1,3 @@
import os
import asyncio import asyncio
import logging import logging
@ -7,51 +6,41 @@ log = logging.getLogger(__name__)
class DiskSpaceManager: class DiskSpaceManager:
def __init__(self, config, cleaning_interval=30 * 60): def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60):
self.config = config self.config = config
self.db = db
self.blob_manager = blob_manager
self.cleaning_interval = cleaning_interval self.cleaning_interval = cleaning_interval
self.running = False self.running = False
self.task = None self.task = None
@property async def get_space_used_bytes(self):
def space_used_bytes(self): return await self.db.get_stored_blob_disk_usage()
used = 0
data_dir = os.path.join(self.config.data_dir, 'blobfiles')
for item in os.scandir(data_dir):
if item.is_file:
used += item.stat().st_size
return used
@property async def get_space_used_mb(self):
def space_used_mb(self): return int(await self.get_space_used_bytes()/1024.0/1024.0)
return int(self.space_used_bytes/1024.0/1024.0)
def clean(self): async def clean(self):
if not self.config.blob_storage_limit: if not self.config.blob_storage_limit:
return 0 return 0
used = 0 delete = []
files = [] available = (self.config.blob_storage_limit*1024*1024) - await self.get_space_used_bytes()
data_dir = os.path.join(self.config.data_dir, 'blobfiles') if available > 0:
for file in os.scandir(data_dir): return 0
if file.is_file: for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False):
file_stats = file.stat() delete.append(blob_hash)
used += file_stats.st_size
files.append((file_stats.st_mtime, file_stats.st_size, file.path))
files.sort()
available = (self.config.blob_storage_limit*1024*1024) - used
cleaned = 0
for _, file_size, file in files:
available += file_size available += file_size
if available > 0: if available > 0:
break break
os.remove(file) if delete:
cleaned += 1 await self.db.stop_all_files()
return cleaned await self.blob_manager.delete_blobs(delete, delete_from_db=True)
return len(delete)
async def cleaning_loop(self): async def cleaning_loop(self):
while self.running: while self.running:
await asyncio.sleep(self.cleaning_interval) await asyncio.sleep(self.cleaning_interval)
await asyncio.get_event_loop().run_in_executor(None, self.clean) await self.clean()
async def start(self): async def start(self):
self.running = True self.running = True

View file

@ -61,7 +61,7 @@ class DatabaseComponent(Component):
@staticmethod @staticmethod
def get_current_db_revision(): def get_current_db_revision():
return 14 return 15
@property @property
def revision_filename(self): def revision_filename(self):
@ -379,22 +379,28 @@ class FileManagerComponent(Component):
class DiskSpaceComponent(Component): class DiskSpaceComponent(Component):
component_name = DISK_SPACE_COMPONENT component_name = DISK_SPACE_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
super().__init__(component_manager) super().__init__(component_manager)
self.disk_space_manager = DiskSpaceManager(self.conf) self.disk_space_manager: typing.Optional[DiskSpaceManager] = None
@property @property
def component(self) -> typing.Optional[DiskSpaceManager]: def component(self) -> typing.Optional[DiskSpaceManager]:
return self.disk_space_manager return self.disk_space_manager
async def get_status(self): async def get_status(self):
return { if self.disk_space_manager:
'space_used': str(self.disk_space_manager.space_used_mb), return {
'running': self.disk_space_manager.running, 'space_used': str(await self.disk_space_manager.get_space_used_mb()),
} 'running': self.disk_space_manager.running,
}
return {'space_used': '0', 'running': False}
async def start(self): async def start(self):
db = self.component_manager.get_component(DATABASE_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
self.disk_space_manager = DiskSpaceManager(self.conf, db, blob_manager)
await self.disk_space_manager.start() await self.disk_space_manager.start()
async def stop(self): async def stop(self):

View file

@ -5041,7 +5041,7 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: Returns:
(bool) true if successful (bool) true if successful
""" """
return self.disk_space_manager.clean() return await self.disk_space_manager.clean()
@requires(FILE_MANAGER_COMPONENT) @requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_reflect(self, **kwargs): async def jsonrpc_file_reflect(self, **kwargs):

View file

@ -35,6 +35,8 @@ def migrate_db(conf, start, end):
from .migrate12to13 import do_migration from .migrate12to13 import do_migration
elif current == 13: elif current == 13:
from .migrate13to14 import do_migration from .migrate13to14 import do_migration
elif current == 14:
from .migrate14to15 import do_migration
else: else:
raise Exception(f"DB migration of version {current} to {current+1} is not available") raise Exception(f"DB migration of version {current} to {current+1} is not available")
try: try:

View file

@ -0,0 +1,16 @@
import os
import sqlite3
def do_migration(conf):
db_path = os.path.join(conf.data_dir, "lbrynet.sqlite")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
cursor.executescript("""
alter table blob add column added_on integer not null default 0;
alter table blob add column is_mine integer not null default 1;
""")
connection.commit()
connection.close()

View file

@ -170,8 +170,8 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'): def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
# add all blobs, except the last one, which is empty # add all blobs, except the last one, which is empty
transaction.executemany( transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0) ((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0, blob.added_on, blob.is_mine)
for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob]) for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob])
).fetchall() ).fetchall()
# associate the blobs to the stream # associate the blobs to the stream
@ -242,7 +242,9 @@ class SQLiteStorage(SQLiteMixin):
should_announce integer not null default 0, should_announce integer not null default 0,
status text not null, status text not null,
last_announced_time integer, last_announced_time integer,
single_announce integer single_announce integer,
added_on integer not null,
is_mine integer not null default 0
); );
create table if not exists stream ( create table if not exists stream (
@ -356,19 +358,19 @@ class SQLiteStorage(SQLiteMixin):
# # # # # # # # # blob functions # # # # # # # # # # # # # # # # # # blob functions # # # # # # # # #
async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int], finished=False): async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int, int, int], finished=False):
def _add_blobs(transaction: sqlite3.Connection): def _add_blobs(transaction: sqlite3.Connection):
transaction.executemany( transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
( (
(blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0) (blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0, added_on, is_mine)
for blob_hash, length in blob_hashes_and_lengths for blob_hash, length, added_on, is_mine in blob_hashes_and_lengths
) )
).fetchall() ).fetchall()
if finished: if finished:
transaction.executemany( transaction.executemany(
"update blob set status='finished' where blob.blob_hash=?", ( "update blob set status='finished' where blob.blob_hash=?", (
(blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths (blob_hash, ) for blob_hash, _, _, _ in blob_hashes_and_lengths
) )
).fetchall() ).fetchall()
return await self.db.run(_add_blobs) return await self.db.run(_add_blobs)
@ -435,6 +437,29 @@ class SQLiteStorage(SQLiteMixin):
def get_all_blob_hashes(self): def get_all_blob_hashes(self):
return self.run_and_return_list("select blob_hash from blob") return self.run_and_return_list("select blob_hash from blob")
async def get_stored_blobs(self, is_mine: bool):
is_mine = 1 if is_mine else 0
return await self.db.execute_fetchall(
"select blob_hash, blob_length, added_on from blob where is_mine=? order by added_on asc",
(is_mine,)
)
async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None):
if is_mine is None:
sql, args = "select coalesce(sum(blob_length), 0) from blob", ()
else:
is_mine = 1 if is_mine else 0
sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,)
return (await self.db.execute_fetchone(sql, args))[0]
async def update_blob_ownership(self, sd_hash, is_mine: bool):
is_mine = 1 if is_mine else 0
await self.db.execute_fetchall(
"update blob set is_mine = ? where blob_hash in ("
" select blob_hash from blob natural join stream_blob natural join stream where sd_hash = ?"
") OR blob_hash = ?", (is_mine, sd_hash, sd_hash)
)
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]: def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]: def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]:
finished_blob_hashes = tuple( finished_blob_hashes = tuple(
@ -570,6 +595,10 @@ class SQLiteStorage(SQLiteMixin):
log.debug("update file status %s -> %s", stream_hash, new_status) log.debug("update file status %s -> %s", stream_hash, new_status)
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash)) return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
def stop_all_files(self):
log.debug("stopping all files")
return self.db.execute_fetchall("update file set status=?", ("stopped",))
async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str], async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str],
file_name: typing.Optional[str]): file_name: typing.Optional[str]):
if not file_name or not download_dir: if not file_name or not download_dir:

View file

@ -4,6 +4,7 @@ import binascii
import logging import logging
import typing import typing
import asyncio import asyncio
import time
import re import re
from collections import OrderedDict from collections import OrderedDict
from cryptography.hazmat.primitives.ciphers.algorithms import AES from cryptography.hazmat.primitives.ciphers.algorithms import AES
@ -152,15 +153,19 @@ class StreamDescriptor:
h.update(self.old_sort_json()) h.update(self.old_sort_json())
return h.hexdigest() return h.hexdigest()
async def make_sd_blob(self, blob_file_obj: typing.Optional[AbstractBlob] = None, async def make_sd_blob(
old_sort: typing.Optional[bool] = False, self, blob_file_obj: typing.Optional[AbstractBlob] = None, old_sort: typing.Optional[bool] = False,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None): blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
added_on: float = None, is_mine: bool = False
):
sd_hash = self.calculate_sd_hash() if not old_sort else self.calculate_old_sort_sd_hash() sd_hash = self.calculate_sd_hash() if not old_sort else self.calculate_old_sort_sd_hash()
if not old_sort: if not old_sort:
sd_data = self.as_json() sd_data = self.as_json()
else: else:
sd_data = self.old_sort_json() sd_data = self.old_sort_json()
sd_blob = blob_file_obj or BlobFile(self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir) sd_blob = blob_file_obj or BlobFile(
self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir, added_on, is_mine
)
if blob_file_obj: if blob_file_obj:
blob_file_obj.set_length(len(sd_data)) blob_file_obj.set_length(len(sd_data))
if not sd_blob.get_is_verified(): if not sd_blob.get_is_verified():
@ -252,20 +257,25 @@ class StreamDescriptor:
iv_generator = iv_generator or random_iv_generator() iv_generator = iv_generator or random_iv_generator()
key = key or os.urandom(AES.block_size // 8) key = key or os.urandom(AES.block_size // 8)
blob_num = -1 blob_num = -1
added_on = time.time()
async for blob_bytes in file_reader(file_path): async for blob_bytes in file_reader(file_path):
blob_num += 1 blob_num += 1
blob_info = await BlobFile.create_from_unencrypted( blob_info = await BlobFile.create_from_unencrypted(
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, added_on, True, blob_completed_callback
) )
blobs.append(blob_info) blobs.append(blob_info)
blobs.append( blobs.append(
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode())) # add the stream terminator # add the stream terminator
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), None, added_on, True)
)
file_name = os.path.basename(file_path) file_name = os.path.basename(file_path)
suggested_file_name = sanitize_file_name(file_name) suggested_file_name = sanitize_file_name(file_name)
descriptor = cls( descriptor = cls(
loop, blob_dir, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs loop, blob_dir, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs
) )
sd_blob = await descriptor.make_sd_blob(old_sort=old_sort, blob_completed_callback=blob_completed_callback) sd_blob = await descriptor.make_sd_blob(
old_sort=old_sort, blob_completed_callback=blob_completed_callback, added_on=added_on, is_mine=True
)
descriptor.sd_hash = sd_blob.blob_hash descriptor.sd_hash = sd_blob.blob_hash
return descriptor return descriptor

View file

@ -236,7 +236,7 @@ class StreamManager(SourceManager):
blob_completed_callback=self.blob_manager.blob_completed blob_completed_callback=self.blob_manager.blob_completed
) )
await self.storage.store_stream( await self.storage.store_stream(
self.blob_manager.get_blob(descriptor.sd_hash), descriptor self.blob_manager.get_blob(descriptor.sd_hash, is_mine=True), descriptor
) )
row_id = await self.storage.save_published_file( row_id = await self.storage.save_published_file(
descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0 descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0

View file

@ -649,6 +649,9 @@ class CommandTestCase(IntegrationTestCase):
async def transaction_list(self, *args, **kwargs): async def transaction_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_transaction_list(*args, **kwargs)))['items'] return (await self.out(self.daemon.jsonrpc_transaction_list(*args, **kwargs)))['items']
async def blob_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_blob_list(*args, **kwargs)))['items']
@staticmethod @staticmethod
def get_claim_id(tx): def get_claim_id(tx):
return tx['outputs'][0]['claim_id'] return tx['outputs'][0]['claim_id']

View file

@ -515,16 +515,49 @@ class FileCommands(CommandTestCase):
class DiskSpaceManagement(CommandTestCase): class DiskSpaceManagement(CommandTestCase):
async def get_referenced_blobs(self, tx):
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
stream_hash = await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash)
return tx['outputs'][0]['value']['source']['sd_hash'], set(await self.blob_list(
stream_hash=stream_hash
))
async def test_file_management(self): async def test_file_management(self):
status = await self.status() status = await self.status()
self.assertIn('disk_space', status) self.assertIn('disk_space', status)
self.assertEqual('0', status['disk_space']['space_used']) self.assertEqual('0', status['disk_space']['space_used'])
self.assertEqual(True, status['disk_space']['running']) self.assertEqual(True, status['disk_space']['running'])
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode()) sd_hash1, blobs1 = await self.get_referenced_blobs(
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode()) await self.stream_create('foo1', '0.01', data=('0' * 2 * 1024 * 1024).encode())
self.assertEqual('5', (await self.status())['disk_space']['space_used']) )
sd_hash2, blobs2 = await self.get_referenced_blobs(
await self.stream_create('foo2', '0.01', data=('0' * 3 * 1024 * 1024).encode())
)
sd_hash3, blobs3 = await self.get_referenced_blobs(
await self.stream_create('foo3', '0.01', data=('0' * 3 * 1024 * 1024).encode())
)
sd_hash4, blobs4 = await self.get_referenced_blobs(
await self.stream_create('foo4', '0.01', data=('0' * 2 * 1024 * 1024).encode())
)
await self.daemon.storage.update_blob_ownership(sd_hash1, False)
await self.daemon.storage.update_blob_ownership(sd_hash3, False)
await self.daemon.storage.update_blob_ownership(sd_hash4, False)
self.assertEqual('10', (await self.status())['disk_space']['space_used'])
self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list()))
await self.blob_clean() await self.blob_clean()
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
self.daemon.conf.blob_storage_limit = 3 self.assertEqual('10', (await self.status())['disk_space']['space_used'])
self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list()))
self.daemon.conf.blob_storage_limit = 6
await self.blob_clean() await self.blob_clean()
self.assertEqual('3', (await self.status())['disk_space']['space_used'])
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
blobs = set(await self.blob_list())
self.assertFalse(blobs1.issubset(blobs))
self.assertTrue(blobs2.issubset(blobs))
self.assertFalse(blobs3.issubset(blobs))
self.assertTrue(blobs4.issubset(blobs))

View file

@ -5,7 +5,7 @@ from lbry.testcase import AsyncioTestCase
from lbry.conf import Config from lbry.conf import Config
from lbry.extras import cli from lbry.extras import cli
from lbry.extras.daemon.components import ( from lbry.extras.daemon.components import (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT,
LIBTORRENT_COMPONENT LIBTORRENT_COMPONENT
@ -21,7 +21,7 @@ class CLIIntegrationTest(AsyncioTestCase):
conf.share_usage_data = False conf.share_usage_data = False
conf.api = 'localhost:5299' conf.api = 'localhost:5299'
conf.components_to_skip = ( conf.components_to_skip = (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT,
LIBTORRENT_COMPONENT LIBTORRENT_COMPONENT

View file

@ -1,30 +0,0 @@
import os
import unittest
import tempfile
import lbry.wallet
from lbry.conf import Config
from lbry.blob.disk_space_manager import DiskSpaceManager
class ConfigurationTests(unittest.TestCase):
def test_space_management(self):
with tempfile.TemporaryDirectory() as temp_dir:
os.mkdir(os.path.join(temp_dir, 'blobfiles'))
config = Config(
blob_storage_limit=5,
data_dir=temp_dir,
wallet_dir=temp_dir,
config=os.path.join(temp_dir, 'settings.yml'),
)
dsm = DiskSpaceManager(config)
self.assertEqual(0, dsm.space_used_mb)
for file_no in range(10):
with open(os.path.join(config.data_dir, 'blobfiles', f'3mb-{file_no}'), 'w') as blob:
blob.write('0' * 1 * 1024 * 1024)
self.assertEqual(10, dsm.space_used_mb)
self.assertTrue(dsm.clean())
self.assertEqual(5, dsm.space_used_mb)
self.assertFalse(dsm.clean())

View file

@ -3,7 +3,7 @@ from lbry.testcase import AsyncioTestCase, AdvanceTimeTestCase
from lbry.conf import Config from lbry.conf import Config
from lbry.extras.daemon.componentmanager import ComponentManager from lbry.extras.daemon.componentmanager import ComponentManager
from lbry.extras.daemon.components import DATABASE_COMPONENT, DHT_COMPONENT from lbry.extras.daemon.components import DATABASE_COMPONENT, DISK_SPACE_COMPONENT, DHT_COMPONENT
from lbry.extras.daemon.components import HASH_ANNOUNCER_COMPONENT, UPNP_COMPONENT from lbry.extras.daemon.components import HASH_ANNOUNCER_COMPONENT, UPNP_COMPONENT
from lbry.extras.daemon.components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT from lbry.extras.daemon.components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
from lbry.extras.daemon import components from lbry.extras.daemon import components
@ -15,7 +15,6 @@ class TestComponentManager(AsyncioTestCase):
self.default_components_sort = [ self.default_components_sort = [
[ [
components.DatabaseComponent, components.DatabaseComponent,
components.DiskSpaceComponent,
components.ExchangeRateManagerComponent, components.ExchangeRateManagerComponent,
components.TorrentComponent, components.TorrentComponent,
components.UPnPComponent components.UPnPComponent
@ -26,6 +25,7 @@ class TestComponentManager(AsyncioTestCase):
components.WalletComponent components.WalletComponent
], ],
[ [
components.DiskSpaceComponent,
components.FileManagerComponent, components.FileManagerComponent,
components.HashAnnouncerComponent, components.HashAnnouncerComponent,
components.PeerProtocolServerComponent, components.PeerProtocolServerComponent,
@ -153,7 +153,7 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
self.component_manager = ComponentManager( self.component_manager = ComponentManager(
Config(), Config(),
skip_components=[ skip_components=[
DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, DATABASE_COMPONENT, DISK_SPACE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT,
EXCHANGE_RATE_MANAGER_COMPONENT], EXCHANGE_RATE_MANAGER_COMPONENT],
wallet=FakeDelayedWallet, wallet=FakeDelayedWallet,

View file

@ -81,7 +81,7 @@ class StorageTest(AsyncioTestCase):
await self.storage.close() await self.storage.close()
async def store_fake_blob(self, blob_hash, length=100): async def store_fake_blob(self, blob_hash, length=100):
await self.storage.add_blobs((blob_hash, length), finished=True) await self.storage.add_blobs((blob_hash, length, 0, 0), finished=True)
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"): async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())] blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())]

View file

@ -8,7 +8,7 @@ from lbry.testcase import AsyncioTestCase
from lbry.conf import Config from lbry.conf import Config
from lbry.extras.daemon.security import is_request_allowed as allowed, ensure_request_allowed as ensure from lbry.extras.daemon.security import is_request_allowed as allowed, ensure_request_allowed as ensure
from lbry.extras.daemon.components import ( from lbry.extras.daemon.components import (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT,
LIBTORRENT_COMPONENT LIBTORRENT_COMPONENT
@ -69,7 +69,7 @@ class TestAccessHeaders(AsyncioTestCase):
conf.share_usage_data = False conf.share_usage_data = False
conf.api = 'localhost:5299' conf.api = 'localhost:5299'
conf.components_to_skip = ( conf.components_to_skip = (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT,
LIBTORRENT_COMPONENT LIBTORRENT_COMPONENT