forked from LBRYCommunity/lbry-sdk
db migration and other fixes
This commit is contained in:
parent
ef89c2e47a
commit
60836d8523
9 changed files with 109 additions and 45 deletions
|
@ -72,14 +72,15 @@ class AbstractBlob:
|
||||||
'verified',
|
'verified',
|
||||||
'writing',
|
'writing',
|
||||||
'readers',
|
'readers',
|
||||||
'is_mine',
|
|
||||||
'added_on',
|
'added_on',
|
||||||
|
'is_mine',
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
def __init__(
|
||||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||||
blob_directory: typing.Optional[str] = None, is_mine: bool = False,
|
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
||||||
added_on: typing.Optional[int] = None):
|
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False,
|
||||||
|
):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_hash = blob_hash
|
self.blob_hash = blob_hash
|
||||||
self.length = length
|
self.length = length
|
||||||
|
@ -89,8 +90,8 @@ class AbstractBlob:
|
||||||
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
|
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
|
||||||
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
|
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
|
||||||
self.readers: typing.List[typing.BinaryIO] = []
|
self.readers: typing.List[typing.BinaryIO] = []
|
||||||
self.is_mine = is_mine
|
|
||||||
self.added_on = added_on or time.time()
|
self.added_on = added_on or time.time()
|
||||||
|
self.is_mine = is_mine
|
||||||
|
|
||||||
if not is_valid_blobhash(blob_hash):
|
if not is_valid_blobhash(blob_hash):
|
||||||
raise InvalidBlobHashError(blob_hash)
|
raise InvalidBlobHashError(blob_hash)
|
||||||
|
@ -186,20 +187,21 @@ class AbstractBlob:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create_from_unencrypted(
|
async def create_from_unencrypted(
|
||||||
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
||||||
unencrypted: bytes, blob_num: int,
|
unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool,
|
||||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo:
|
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
|
||||||
|
) -> BlobInfo:
|
||||||
"""
|
"""
|
||||||
Create an encrypted BlobFile from plaintext bytes
|
Create an encrypted BlobFile from plaintext bytes
|
||||||
"""
|
"""
|
||||||
|
|
||||||
blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
|
blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
|
||||||
length = len(blob_bytes)
|
length = len(blob_bytes)
|
||||||
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir)
|
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir, added_on, is_mine)
|
||||||
writer = blob.get_blob_writer()
|
writer = blob.get_blob_writer()
|
||||||
writer.write(blob_bytes)
|
writer.write(blob_bytes)
|
||||||
await blob.verified.wait()
|
await blob.verified.wait()
|
||||||
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash)
|
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash, added_on, is_mine)
|
||||||
|
|
||||||
def save_verified_blob(self, verified_bytes: bytes):
|
def save_verified_blob(self, verified_bytes: bytes):
|
||||||
if self.verified.is_set():
|
if self.verified.is_set():
|
||||||
|
@ -254,11 +256,13 @@ class BlobBuffer(AbstractBlob):
|
||||||
"""
|
"""
|
||||||
An in-memory only blob
|
An in-memory only blob
|
||||||
"""
|
"""
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
def __init__(
|
||||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||||
blob_directory: typing.Optional[str] = None):
|
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
||||||
|
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
|
||||||
|
):
|
||||||
self._verified_bytes: typing.Optional[BytesIO] = None
|
self._verified_bytes: typing.Optional[BytesIO] = None
|
||||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
|
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
|
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
|
||||||
|
@ -295,10 +299,12 @@ class BlobFile(AbstractBlob):
|
||||||
"""
|
"""
|
||||||
A blob existing on the local file system
|
A blob existing on the local file system
|
||||||
"""
|
"""
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
def __init__(
|
||||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||||
blob_directory: typing.Optional[str] = None):
|
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
||||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
|
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
|
||||||
|
):
|
||||||
|
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
|
||||||
if not blob_directory or not os.path.isdir(blob_directory):
|
if not blob_directory or not os.path.isdir(blob_directory):
|
||||||
raise OSError(f"invalid blob directory '{blob_directory}'")
|
raise OSError(f"invalid blob directory '{blob_directory}'")
|
||||||
self.file_path = os.path.join(self.blob_directory, self.blob_hash)
|
self.file_path = os.path.join(self.blob_directory, self.blob_hash)
|
||||||
|
@ -349,12 +355,12 @@ class BlobFile(AbstractBlob):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create_from_unencrypted(
|
async def create_from_unencrypted(
|
||||||
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
||||||
unencrypted: bytes, blob_num: int,
|
unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool,
|
||||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'],
|
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None
|
||||||
asyncio.Task]] = None) -> BlobInfo:
|
) -> BlobInfo:
|
||||||
if not blob_dir or not os.path.isdir(blob_dir):
|
if not blob_dir or not os.path.isdir(blob_dir):
|
||||||
raise OSError(f"cannot create blob in directory: '{blob_dir}'")
|
raise OSError(f"cannot create blob in directory: '{blob_dir}'")
|
||||||
return await super().create_from_unencrypted(
|
return await super().create_from_unencrypted(
|
||||||
loop, blob_dir, key, iv, unencrypted, blob_num, blob_completed_callback
|
loop, blob_dir, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback
|
||||||
)
|
)
|
||||||
|
|
|
@ -25,12 +25,14 @@ class DiskSpaceManager:
|
||||||
if not self.config.blob_storage_limit:
|
if not self.config.blob_storage_limit:
|
||||||
return 0
|
return 0
|
||||||
delete = []
|
delete = []
|
||||||
available = (self.config.blob_storage_limit*1024*1024) - await self.db.get_stored_blob_disk_usage()
|
available = (self.config.blob_storage_limit*1024*1024) - await self.get_space_used_bytes()
|
||||||
for blob_hash, file_size in await self.db.get_stored_blobs(is_mine=False):
|
if available > 0:
|
||||||
|
return 0
|
||||||
|
for blob_hash, file_size, added_on in await self.db.get_stored_blobs(is_mine=False):
|
||||||
|
delete.append(blob_hash)
|
||||||
available += file_size
|
available += file_size
|
||||||
if available > 0:
|
if available > 0:
|
||||||
break
|
break
|
||||||
delete.append(blob_hash)
|
|
||||||
if delete:
|
if delete:
|
||||||
await self.blob_manager.delete_blobs(delete, delete_from_db=True)
|
await self.blob_manager.delete_blobs(delete, delete_from_db=True)
|
||||||
return len(delete)
|
return len(delete)
|
||||||
|
|
|
@ -61,7 +61,7 @@ class DatabaseComponent(Component):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_current_db_revision():
|
def get_current_db_revision():
|
||||||
return 14
|
return 15
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def revision_filename(self):
|
def revision_filename(self):
|
||||||
|
|
|
@ -35,6 +35,8 @@ def migrate_db(conf, start, end):
|
||||||
from .migrate12to13 import do_migration
|
from .migrate12to13 import do_migration
|
||||||
elif current == 13:
|
elif current == 13:
|
||||||
from .migrate13to14 import do_migration
|
from .migrate13to14 import do_migration
|
||||||
|
elif current == 14:
|
||||||
|
from .migrate14to15 import do_migration
|
||||||
else:
|
else:
|
||||||
raise Exception(f"DB migration of version {current} to {current+1} is not available")
|
raise Exception(f"DB migration of version {current} to {current+1} is not available")
|
||||||
try:
|
try:
|
||||||
|
|
16
lbry/extras/daemon/migrator/migrate14to15.py
Normal file
16
lbry/extras/daemon/migrator/migrate14to15.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
|
||||||
|
def do_migration(conf):
|
||||||
|
db_path = os.path.join(conf.data_dir, "lbrynet.sqlite")
|
||||||
|
connection = sqlite3.connect(db_path)
|
||||||
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
cursor.executescript("""
|
||||||
|
alter table blob add column added_on integer not null default 0;
|
||||||
|
alter table blob add column is_mine integer not null default 0;
|
||||||
|
""")
|
||||||
|
|
||||||
|
connection.commit()
|
||||||
|
connection.close()
|
|
@ -437,23 +437,27 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def get_all_blob_hashes(self):
|
def get_all_blob_hashes(self):
|
||||||
return self.run_and_return_list("select blob_hash from blob")
|
return self.run_and_return_list("select blob_hash from blob")
|
||||||
|
|
||||||
async def get_stored_blobs(self, is_mine):
|
async def get_stored_blobs(self, is_mine: bool):
|
||||||
|
is_mine = 1 if is_mine else 0
|
||||||
return await self.db.execute_fetchall(
|
return await self.db.execute_fetchall(
|
||||||
"select blob_hash, blob_length from blob where is_mine=? order by added_on", (is_mine,)
|
"select blob_hash, blob_length, added_on from blob where is_mine=? order by added_on asc",
|
||||||
|
(is_mine,)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_stored_blob_disk_usage(self, is_mine=None):
|
async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None):
|
||||||
if is_mine is None:
|
if is_mine is None:
|
||||||
sql, args = "select coalesce(sum(blob_length), 0) from blob", ()
|
sql, args = "select coalesce(sum(blob_length), 0) from blob", ()
|
||||||
else:
|
else:
|
||||||
|
is_mine = 1 if is_mine else 0
|
||||||
sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,)
|
sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,)
|
||||||
return (await self.db.execute_fetchone(sql, args))[0]
|
return (await self.db.execute_fetchone(sql, args))[0]
|
||||||
|
|
||||||
async def update_blob_ownership(self, stream_hash, is_mine: bool):
|
async def update_blob_ownership(self, sd_hash, is_mine: bool):
|
||||||
|
is_mine = 1 if is_mine else 0
|
||||||
await self.db.execute_fetchall(
|
await self.db.execute_fetchall(
|
||||||
"update blob set is_mine = ? where blob_hash in ("
|
"update blob set is_mine = ? where blob_hash in ("
|
||||||
" select blob_hash from stream_blob where stream_hash = ?"
|
" select blob_hash from blob natural join stream_blob natural join stream where sd_hash = ?"
|
||||||
")", (1 if is_mine else 0, stream_hash)
|
")", (is_mine, sd_hash)
|
||||||
)
|
)
|
||||||
|
|
||||||
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
|
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
|
||||||
|
|
|
@ -4,6 +4,7 @@ import binascii
|
||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
import re
|
import re
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||||
|
@ -252,14 +253,17 @@ class StreamDescriptor:
|
||||||
iv_generator = iv_generator or random_iv_generator()
|
iv_generator = iv_generator or random_iv_generator()
|
||||||
key = key or os.urandom(AES.block_size // 8)
|
key = key or os.urandom(AES.block_size // 8)
|
||||||
blob_num = -1
|
blob_num = -1
|
||||||
|
added_on = time.time()
|
||||||
async for blob_bytes in file_reader(file_path):
|
async for blob_bytes in file_reader(file_path):
|
||||||
blob_num += 1
|
blob_num += 1
|
||||||
blob_info = await BlobFile.create_from_unencrypted(
|
blob_info = await BlobFile.create_from_unencrypted(
|
||||||
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback
|
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, added_on, True, blob_completed_callback
|
||||||
)
|
)
|
||||||
blobs.append(blob_info)
|
blobs.append(blob_info)
|
||||||
blobs.append(
|
blobs.append(
|
||||||
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode())) # add the stream terminator
|
# add the stream terminator
|
||||||
|
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), None, added_on, True)
|
||||||
|
)
|
||||||
file_name = os.path.basename(file_path)
|
file_name = os.path.basename(file_path)
|
||||||
suggested_file_name = sanitize_file_name(file_name)
|
suggested_file_name = sanitize_file_name(file_name)
|
||||||
descriptor = cls(
|
descriptor = cls(
|
||||||
|
|
|
@ -649,6 +649,9 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
async def transaction_list(self, *args, **kwargs):
|
async def transaction_list(self, *args, **kwargs):
|
||||||
return (await self.out(self.daemon.jsonrpc_transaction_list(*args, **kwargs)))['items']
|
return (await self.out(self.daemon.jsonrpc_transaction_list(*args, **kwargs)))['items']
|
||||||
|
|
||||||
|
async def blob_list(self, *args, **kwargs):
|
||||||
|
return (await self.out(self.daemon.jsonrpc_blob_list(*args, **kwargs)))['items']
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_claim_id(tx):
|
def get_claim_id(tx):
|
||||||
return tx['outputs'][0]['claim_id']
|
return tx['outputs'][0]['claim_id']
|
||||||
|
|
|
@ -515,19 +515,46 @@ class FileCommands(CommandTestCase):
|
||||||
|
|
||||||
class DiskSpaceManagement(CommandTestCase):
|
class DiskSpaceManagement(CommandTestCase):
|
||||||
|
|
||||||
|
async def get_referenced_blobs(self, tx):
|
||||||
|
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
|
||||||
|
stream_hash = await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
|
return tx['outputs'][0]['value']['source']['sd_hash'], set(await self.blob_list(
|
||||||
|
stream_hash=stream_hash
|
||||||
|
))
|
||||||
|
|
||||||
async def test_file_management(self):
|
async def test_file_management(self):
|
||||||
status = await self.status()
|
status = await self.status()
|
||||||
self.assertIn('disk_space', status)
|
self.assertIn('disk_space', status)
|
||||||
self.assertEqual('0', status['disk_space']['space_used'])
|
self.assertEqual('0', status['disk_space']['space_used'])
|
||||||
self.assertEqual(True, status['disk_space']['running'])
|
self.assertEqual(True, status['disk_space']['running'])
|
||||||
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
sd_hash1, blobs1 = await self.get_referenced_blobs(
|
||||||
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
||||||
stream = await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
)
|
||||||
stream_hash = stream['outputs'][0]['value']['source']['hash']
|
sd_hash2, blobs2 = await self.get_referenced_blobs(
|
||||||
await self.daemon.storage.update_blob_ownership(stream_hash, False)
|
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||||
self.assertEqual('7', (await self.status())['disk_space']['space_used'])
|
)
|
||||||
|
sd_hash3, blobs3 = await self.get_referenced_blobs(
|
||||||
|
await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||||
|
)
|
||||||
|
sd_hash4, blobs4 = await self.get_referenced_blobs(
|
||||||
|
await self.stream_create('foo4', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.daemon.storage.update_blob_ownership(sd_hash1, False)
|
||||||
|
await self.daemon.storage.update_blob_ownership(sd_hash3, False)
|
||||||
|
await self.daemon.storage.update_blob_ownership(sd_hash4, False)
|
||||||
|
|
||||||
|
self.assertEqual('9', (await self.status())['disk_space']['space_used'])
|
||||||
|
self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list()))
|
||||||
|
|
||||||
await self.blob_clean()
|
await self.blob_clean()
|
||||||
self.assertEqual('7', (await self.status())['disk_space']['space_used'])
|
|
||||||
self.daemon.conf.blob_storage_limit = 3
|
self.assertEqual('9', (await self.status())['disk_space']['space_used'])
|
||||||
|
self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list()))
|
||||||
|
|
||||||
|
self.daemon.conf.blob_storage_limit = 5
|
||||||
await self.blob_clean()
|
await self.blob_clean()
|
||||||
self.assertEqual('2', (await self.status())['disk_space']['space_used'])
|
|
||||||
|
self.assertEqual('4', (await self.status())['disk_space']['space_used'])
|
||||||
|
blobs = set(await self.blob_list())
|
||||||
|
self.assertEqual(blobs2 | blobs4, set(await self.blob_list()))
|
||||||
|
|
Loading…
Reference in a new issue