Add support for hierarchical structure within blob files dir.
Add --blob-dirs argument to allow extra blob dirs outside of the main "data dir".
This commit is contained in:
parent
6258651650
commit
eb2bbca100
11 changed files with 170 additions and 69 deletions
|
@ -19,6 +19,9 @@ from lbry.blob import MAX_BLOB_SIZE, BLOBHASH_LENGTH
|
|||
from lbry.blob.blob_info import BlobInfo
|
||||
from lbry.blob.writer import HashBlobWriter
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.blob.blob_manager import BlobManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -79,13 +82,20 @@ class AbstractBlob:
|
|||
def __init__(
|
||||
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
||||
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False,
|
||||
blob_manager: typing.Optional['BlobManager'] = None,
|
||||
added_on: typing.Optional[int] = None, is_mine: bool = False,
|
||||
):
|
||||
if not is_valid_blobhash(blob_hash):
|
||||
raise InvalidBlobHashError(blob_hash)
|
||||
from lbry.blob.blob_manager import BlobManager # pylint: disable=import-outside-toplevel
|
||||
if not isinstance(blob_manager, BlobManager):
|
||||
raise TypeError(f"{type(blob_manager)} not instance of BlobManager")
|
||||
|
||||
self.loop = loop
|
||||
self.blob_hash = blob_hash
|
||||
self.length = length
|
||||
self.blob_completed_callback = blob_completed_callback
|
||||
self.blob_directory = blob_directory
|
||||
self.blob_directory, _ = blob_manager._blob_dir(blob_hash)
|
||||
self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {}
|
||||
self.verified: asyncio.Event = asyncio.Event()
|
||||
self.writing: asyncio.Event = asyncio.Event()
|
||||
|
@ -93,8 +103,8 @@ class AbstractBlob:
|
|||
self.added_on = added_on or time.time()
|
||||
self.is_mine = is_mine
|
||||
|
||||
if not is_valid_blobhash(blob_hash):
|
||||
raise InvalidBlobHashError(blob_hash)
|
||||
if not self.blob_directory or not os.path.isdir(self.blob_directory):
|
||||
raise OSError(f"cannot create blob in directory: '{self.blob_directory}'")
|
||||
|
||||
def __del__(self):
|
||||
if self.writers or self.readers:
|
||||
|
@ -187,7 +197,7 @@ class AbstractBlob:
|
|||
|
||||
@classmethod
|
||||
async def create_from_unencrypted(
|
||||
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
||||
cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', key: bytes, iv: bytes,
|
||||
unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
|
||||
) -> BlobInfo:
|
||||
|
@ -197,7 +207,7 @@ class AbstractBlob:
|
|||
|
||||
blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
|
||||
length = len(blob_bytes)
|
||||
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir, added_on, is_mine)
|
||||
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine)
|
||||
writer = blob.get_blob_writer()
|
||||
writer.write(blob_bytes)
|
||||
await blob.verified.wait()
|
||||
|
@ -259,10 +269,11 @@ class BlobBuffer(AbstractBlob):
|
|||
def __init__(
|
||||
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
||||
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
|
||||
blob_manager: typing.Optional['BlobManager'] = None,
|
||||
added_on: typing.Optional[int] = None, is_mine: bool = False
|
||||
):
|
||||
self._verified_bytes: typing.Optional[BytesIO] = None
|
||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
|
||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
|
||||
|
@ -302,11 +313,10 @@ class BlobFile(AbstractBlob):
|
|||
def __init__(
|
||||
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
|
||||
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
|
||||
blob_manager: typing.Optional['BlobManager'] = None,
|
||||
added_on: typing.Optional[int] = None, is_mine: bool = False
|
||||
):
|
||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
|
||||
if not blob_directory or not os.path.isdir(blob_directory):
|
||||
raise OSError(f"invalid blob directory '{blob_directory}'")
|
||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine)
|
||||
self.file_path = os.path.join(self.blob_directory, self.blob_hash)
|
||||
if self.file_exists:
|
||||
file_size = int(os.stat(self.file_path).st_size)
|
||||
|
@ -355,12 +365,10 @@ class BlobFile(AbstractBlob):
|
|||
|
||||
@classmethod
|
||||
async def create_from_unencrypted(
|
||||
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
||||
cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', key: bytes, iv: bytes,
|
||||
unencrypted: bytes, blob_num: int, added_on: float, is_mine: bool,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None
|
||||
) -> BlobInfo:
|
||||
if not blob_dir or not os.path.isdir(blob_dir):
|
||||
raise OSError(f"cannot create blob in directory: '{blob_dir}'")
|
||||
return await super().create_from_unencrypted(
|
||||
loop, blob_dir, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback
|
||||
loop, blob_manager, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback
|
||||
)
|
||||
|
|
|
@ -2,8 +2,16 @@ import os
|
|||
import typing
|
||||
import asyncio
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from lbry.utils import LRUCacheWithMetrics
|
||||
from lbry.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
|
||||
from lbry.blob import BLOBHASH_LENGTH
|
||||
from lbry.blob.blob_file import (
|
||||
HEXMATCH,
|
||||
is_valid_blobhash,
|
||||
BlobFile,
|
||||
BlobBuffer,
|
||||
AbstractBlob,
|
||||
)
|
||||
from lbry.stream.descriptor import StreamDescriptor
|
||||
from lbry.connection_manager import ConnectionManager
|
||||
|
||||
|
@ -16,16 +24,19 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class BlobManager:
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config',
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dirs: typing.List[str],
|
||||
storage: 'SQLiteStorage', config: 'Config',
|
||||
node_data_store: typing.Optional['DictDataStore'] = None):
|
||||
"""
|
||||
This class stores blobs on the hard disk
|
||||
|
||||
blob_dir - directory where blobs are stored
|
||||
blob_dirs - directories where blobs are stored
|
||||
storage - SQLiteStorage object
|
||||
"""
|
||||
self.loop = loop
|
||||
self.blob_dir = blob_dir
|
||||
self.blob_dirs = defaultdict(list)
|
||||
self.blob_dirs.update({ '': blob_dirs if isinstance(blob_dirs, list) else [blob_dirs]})
|
||||
self.blob_dirs_max_prefix_len = 0 # Maximum key length in "blob_dirs" dictionary.
|
||||
self.storage = storage
|
||||
self._node_data_store = node_data_store
|
||||
self.completed_blob_hashes: typing.Set[str] = set() if not self._node_data_store\
|
||||
|
@ -36,14 +47,37 @@ class BlobManager:
|
|||
self.config.blob_lru_cache_size)
|
||||
self.connection_manager = ConnectionManager(loop)
|
||||
|
||||
def _blob_dir(self, blob_hash: str) -> typing.Tuple[str, bool]:
|
||||
"""
|
||||
Locate blob directory matching longest prefix of blob hash.
|
||||
An existing blob is preferred, even if it doesn't reside in
|
||||
the directory with longest prefix.
|
||||
"""
|
||||
best_dir = None
|
||||
for prefix in [blob_hash[:i] for i in range(min(len(blob_hash), self.blob_dirs_max_prefix_len), -1, -1)]:
|
||||
if prefix in self.blob_dirs:
|
||||
if not best_dir:
|
||||
best_dir = self.blob_dirs[prefix][0]
|
||||
for path in self.blob_dirs[prefix]:
|
||||
if os.path.isfile(os.path.join(path, blob_hash)):
|
||||
#print(f'blob {blob_hash} FOUND at location: {path}')
|
||||
return path, True
|
||||
#print(f'blob {blob_hash} has BEST location: {best_dir}')
|
||||
return best_dir, False
|
||||
|
||||
|
||||
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None, is_mine: bool = False):
|
||||
if self.config.save_blobs or (
|
||||
is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))):
|
||||
if self.config.save_blobs:
|
||||
return BlobFile(
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
|
||||
self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine
|
||||
)
|
||||
_, blob_found = self._blob_dir(blob_hash)
|
||||
if blob_found:
|
||||
return BlobFile(
|
||||
self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine
|
||||
)
|
||||
return BlobBuffer(
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
|
||||
self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine
|
||||
)
|
||||
|
||||
def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool = False):
|
||||
|
@ -65,21 +99,39 @@ class BlobManager:
|
|||
def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool:
|
||||
if not is_valid_blobhash(blob_hash):
|
||||
raise ValueError(blob_hash)
|
||||
if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
|
||||
_, blob_found = self._blob_dir(blob_hash)
|
||||
if not blob_found:
|
||||
return False
|
||||
if blob_hash in self.blobs:
|
||||
return self.blobs[blob_hash].get_is_verified()
|
||||
return self._get_blob(blob_hash, length).get_is_verified()
|
||||
|
||||
async def setup(self) -> bool:
|
||||
def get_files_in_blob_dir() -> typing.Set[str]:
|
||||
if not self.blob_dir:
|
||||
return set()
|
||||
return {
|
||||
item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name)
|
||||
}
|
||||
def list_blobs(self, paths = None, prefix = '', setup=False):
|
||||
"""
|
||||
Recursively search for blob files within path(s) and subdirectories.
|
||||
When setup=True, subdirectories which are candidates for blob storage
|
||||
are added to the "self.blob_dirs" dictionary.
|
||||
"""
|
||||
blobfiles = set()
|
||||
subdirs = defaultdict(list)
|
||||
for path in paths if paths is not None else self.blob_dirs[prefix]:
|
||||
with os.scandir(path) as entries:
|
||||
for item in entries:
|
||||
if item.is_file() and is_valid_blobhash(item.name):
|
||||
blobfiles.add(item.name)
|
||||
elif item.is_dir() and len(prefix+item.name) < BLOBHASH_LENGTH and HEXMATCH.match(item.name):
|
||||
subdirs[item.name].append(item.path)
|
||||
# Recursively process subdirectories which may also contain blobs.
|
||||
for name, subdir_paths in subdirs.items():
|
||||
if setup:
|
||||
self.blob_dirs[prefix+name] = subdir_paths
|
||||
self.blob_dirs_max_prefix_len = max(self.blob_dirs_max_prefix_len, len(prefix+name))
|
||||
blobfiles.update(self.list_blobs(paths=subdir_paths, prefix=prefix+name, setup=setup))
|
||||
return blobfiles
|
||||
|
||||
in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir)
|
||||
async def setup(self) -> bool:
|
||||
in_blobfiles_dir = await self.loop.run_in_executor(None, lambda: self.list_blobs(setup=True))
|
||||
#print(f'blob dirs: {self.blob_dirs}')
|
||||
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
|
||||
if to_add:
|
||||
self.completed_blob_hashes.update(to_add)
|
||||
|
@ -97,7 +149,7 @@ class BlobManager:
|
|||
self.completed_blob_hashes.clear()
|
||||
|
||||
def get_stream_descriptor(self, sd_hash):
|
||||
return StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_dir, self.get_blob(sd_hash))
|
||||
return StreamDescriptor.from_stream_descriptor_blob(self.loop, self, self.get_blob(sd_hash))
|
||||
|
||||
def blob_completed(self, blob: AbstractBlob) -> asyncio.Task:
|
||||
if blob.blob_hash is None:
|
||||
|
@ -133,8 +185,9 @@ class BlobManager:
|
|||
raise Exception("invalid blob hash to delete")
|
||||
|
||||
if blob_hash not in self.blobs:
|
||||
if self.blob_dir and os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
|
||||
os.remove(os.path.join(self.blob_dir, blob_hash))
|
||||
blob_dir, blob_found = self._blob_dir(blob_hash)
|
||||
if blob_dir and blob_found:
|
||||
os.remove(os.path.join(blob_dir, blob_hash))
|
||||
else:
|
||||
self.blobs.pop(blob_hash).delete()
|
||||
if blob_hash in self.completed_blob_hashes:
|
||||
|
|
17
lbry/conf.py
17
lbry/conf.py
|
@ -284,6 +284,20 @@ class Strings(ListSetting):
|
|||
f"Value of '{string}' at index {idx} in setting " \
|
||||
f"'{self.name}' must be a string."
|
||||
|
||||
class Paths(Strings):
|
||||
|
||||
def validate(self, value):
|
||||
super().validate(value)
|
||||
for idx, path in enumerate(value):
|
||||
assert os.path.isdir(path), \
|
||||
f"Path '{path}' at index {idx} in setting " \
|
||||
f"'{self.name}' must be a path to a directory."
|
||||
|
||||
def __get__(self, obj, owner) -> List[str]:
|
||||
values = super().__get__(obj, owner)
|
||||
if isinstance(values, list):
|
||||
return [os.path.expanduser(os.path.expandvars(path)) for path in values]
|
||||
return values
|
||||
|
||||
class KnownHubsList:
|
||||
|
||||
|
@ -593,7 +607,7 @@ class Config(CLIConfig):
|
|||
jurisdiction = String("Limit interactions to wallet server in this jurisdiction.")
|
||||
|
||||
# directories
|
||||
data_dir = Path("Directory path to store blobs.", metavar='DIR')
|
||||
data_dir = Path("Directory path for daemon settings, blobs, logs, etc.", metavar='DIR')
|
||||
download_dir = Path(
|
||||
"Directory path to place assembled files downloaded from LBRY.",
|
||||
previous_names=['download_directory'], metavar='DIR'
|
||||
|
@ -638,6 +652,7 @@ class Config(CLIConfig):
|
|||
|
||||
# blob announcement and download
|
||||
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
|
||||
blob_dirs = Paths("Additional directory path(s) for storing blobs.", [], metavar='DIR')
|
||||
network_storage_limit = Integer("Disk space in MB to be allocated for helping the P2P network. 0 = disable", 0)
|
||||
blob_storage_limit = Integer("Disk space in MB to be allocated for blob storage. 0 = no limit", 0)
|
||||
blob_lru_cache_size = Integer(
|
||||
|
|
|
@ -222,10 +222,28 @@ class BlobComponent(Component):
|
|||
dht_node: Node = self.component_manager.get_component(DHT_COMPONENT)
|
||||
if dht_node:
|
||||
data_store = dht_node.protocol.data_store
|
||||
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
|
||||
if not os.path.isdir(blob_dir):
|
||||
os.mkdir(blob_dir)
|
||||
self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store)
|
||||
|
||||
# Each blob dir should have 3 levels of subdirs corresponding to hash prefixes.
|
||||
def setup_subdirs(path, depth):
|
||||
if depth <= 0:
|
||||
return
|
||||
for prefix in '0123456789abcdef':
|
||||
subdir = os.path.join(path, prefix)
|
||||
if not os.path.isdir(subdir):
|
||||
os.mkdir(subdir)
|
||||
#print(f'created blob subdir: {subdir}')
|
||||
setup_subdirs(subdir, depth-1)
|
||||
|
||||
# Set up any explict blob dirs plus a default <data_dir>/blobfiles.
|
||||
blob_dirs = self.conf.blob_dirs + [os.path.join(self.conf.data_dir, 'blobfiles')]
|
||||
#print(f'blob dirs: {blob_dirs}')
|
||||
for blob_dir in blob_dirs:
|
||||
if not os.path.isdir(blob_dir):
|
||||
os.mkdir(blob_dir)
|
||||
#print(f'created blob dir: {blob_dir}')
|
||||
setup_subdirs(blob_dir, 3)
|
||||
|
||||
self.blob_manager = BlobManager(self.component_manager.loop, blob_dirs, storage, self.conf, data_store)
|
||||
return await self.blob_manager.setup()
|
||||
|
||||
async def stop(self):
|
||||
|
|
|
@ -14,6 +14,9 @@ from lbry.blob.blob_file import AbstractBlob, BlobFile
|
|||
from lbry.utils import get_lbry_hash_obj
|
||||
from lbry.error import InvalidStreamDescriptorError
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.blob.blob_manager import BlobManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
RE_ILLEGAL_FILENAME_CHARS = re.compile(
|
||||
|
@ -83,7 +86,7 @@ def sanitize_file_name(dirty_name: str, default_file_name: str = 'lbry_download'
|
|||
class StreamDescriptor:
|
||||
__slots__ = [
|
||||
'loop',
|
||||
'blob_dir',
|
||||
'blob_manager',
|
||||
'stream_name',
|
||||
'key',
|
||||
'suggested_file_name',
|
||||
|
@ -92,11 +95,11 @@ class StreamDescriptor:
|
|||
'sd_hash'
|
||||
]
|
||||
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, stream_name: str, key: str,
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', stream_name: str, key: str,
|
||||
suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None,
|
||||
sd_hash: typing.Optional[str] = None):
|
||||
self.loop = loop
|
||||
self.blob_dir = blob_dir
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_name = stream_name
|
||||
self.key = key
|
||||
self.suggested_file_name = suggested_file_name
|
||||
|
@ -164,7 +167,7 @@ class StreamDescriptor:
|
|||
else:
|
||||
sd_data = self.old_sort_json()
|
||||
sd_blob = blob_file_obj or BlobFile(
|
||||
self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir, added_on, is_mine
|
||||
self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_manager, added_on, is_mine
|
||||
)
|
||||
if blob_file_obj:
|
||||
blob_file_obj.set_length(len(sd_data))
|
||||
|
@ -177,7 +180,7 @@ class StreamDescriptor:
|
|||
return sd_blob
|
||||
|
||||
@classmethod
|
||||
def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str,
|
||||
def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager',
|
||||
blob: AbstractBlob) -> 'StreamDescriptor':
|
||||
with blob.reader_context() as blob_reader:
|
||||
json_bytes = blob_reader.read()
|
||||
|
@ -196,7 +199,7 @@ class StreamDescriptor:
|
|||
raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs")
|
||||
added_on = time.time()
|
||||
descriptor = cls(
|
||||
loop, blob_dir,
|
||||
loop, blob_manager,
|
||||
binascii.unhexlify(decoded['stream_name']).decode(),
|
||||
decoded['key'],
|
||||
binascii.unhexlify(decoded['suggested_file_name']).decode(),
|
||||
|
@ -210,11 +213,11 @@ class StreamDescriptor:
|
|||
return descriptor
|
||||
|
||||
@classmethod
|
||||
async def from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str,
|
||||
async def from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager',
|
||||
blob: AbstractBlob) -> 'StreamDescriptor':
|
||||
if not blob.is_readable():
|
||||
raise InvalidStreamDescriptorError(f"unreadable/missing blob: {blob.blob_hash}")
|
||||
return await loop.run_in_executor(None, cls._from_stream_descriptor_blob, loop, blob_dir, blob)
|
||||
return await loop.run_in_executor(None, cls._from_stream_descriptor_blob, loop, blob_manager, blob)
|
||||
|
||||
@staticmethod
|
||||
def get_blob_hashsum(blob_dict: typing.Dict):
|
||||
|
@ -248,7 +251,8 @@ class StreamDescriptor:
|
|||
|
||||
@classmethod
|
||||
async def create_stream(
|
||||
cls, loop: asyncio.AbstractEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None,
|
||||
cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager',
|
||||
file_path: str, key: typing.Optional[bytes] = None,
|
||||
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None,
|
||||
old_sort: bool = False,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'],
|
||||
|
@ -262,7 +266,8 @@ class StreamDescriptor:
|
|||
async for blob_bytes in file_reader(file_path):
|
||||
blob_num += 1
|
||||
blob_info = await BlobFile.create_from_unencrypted(
|
||||
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, added_on, True, blob_completed_callback
|
||||
loop, blob_manager, key, next(iv_generator), blob_bytes, blob_num,
|
||||
added_on, True, blob_completed_callback
|
||||
)
|
||||
blobs.append(blob_info)
|
||||
blobs.append(
|
||||
|
@ -272,7 +277,7 @@ class StreamDescriptor:
|
|||
file_name = os.path.basename(file_path)
|
||||
suggested_file_name = sanitize_file_name(file_name)
|
||||
descriptor = cls(
|
||||
loop, blob_dir, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs
|
||||
loop, blob_manager, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs
|
||||
)
|
||||
sd_blob = await descriptor.make_sd_blob(
|
||||
old_sort=old_sort, blob_completed_callback=blob_completed_callback, added_on=added_on, is_mine=True
|
||||
|
@ -288,10 +293,11 @@ class StreamDescriptor:
|
|||
return self.lower_bound_decrypted_length() + (AES.block_size // 8)
|
||||
|
||||
@classmethod
|
||||
async def recover(cls, blob_dir: str, sd_blob: 'AbstractBlob', stream_hash: str, stream_name: str,
|
||||
async def recover(cls, blob_manager: 'BlobManager', sd_blob: 'AbstractBlob',
|
||||
stream_hash: str, stream_name: str,
|
||||
suggested_file_name: str, key: str,
|
||||
blobs: typing.List['BlobInfo']) -> typing.Optional['StreamDescriptor']:
|
||||
descriptor = cls(asyncio.get_event_loop(), blob_dir, stream_name, key, suggested_file_name,
|
||||
descriptor = cls(asyncio.get_event_loop(), blob_manager, stream_name, key, suggested_file_name,
|
||||
blobs, stream_hash, sd_blob.blob_hash)
|
||||
|
||||
if descriptor.calculate_sd_hash() == sd_blob.blob_hash: # first check for a normal valid sd
|
||||
|
|
|
@ -81,7 +81,7 @@ class StreamDownloader:
|
|||
|
||||
# parse the descriptor
|
||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
||||
self.loop, self.blob_manager.blob_dir, sd_blob
|
||||
self.loop, self.blob_manager, sd_blob
|
||||
)
|
||||
log.info("loaded stream manifest %s", self.sd_hash)
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
|||
try:
|
||||
await asyncio.wait_for(self.sd_blob.verified.wait(), 30)
|
||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
||||
self.loop, self.blob_manager.blob_dir, self.sd_blob
|
||||
self.loop, self.blob_manager, self.sd_blob
|
||||
)
|
||||
self.send_response({"received_sd_blob": True})
|
||||
except asyncio.TimeoutError:
|
||||
|
@ -109,7 +109,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
|||
self.writer = None
|
||||
else:
|
||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
||||
self.loop, self.blob_manager.blob_dir, self.sd_blob
|
||||
self.loop, self.blob_manager, self.sd_blob
|
||||
)
|
||||
self.incoming.clear()
|
||||
self.not_incoming.set()
|
||||
|
|
|
@ -78,7 +78,7 @@ class StreamManager(SourceManager):
|
|||
sd_blob = self.blob_manager.get_blob(sd_hash)
|
||||
blobs = await self.storage.get_blobs_for_stream(stream_hash)
|
||||
descriptor = await StreamDescriptor.recover(
|
||||
self.blob_manager.blob_dir, sd_blob, stream_hash, stream_name, suggested_file_name, key, blobs
|
||||
self.blob_manager, sd_blob, stream_hash, stream_name, suggested_file_name, key, blobs
|
||||
)
|
||||
if not descriptor:
|
||||
return
|
||||
|
@ -236,7 +236,7 @@ class StreamManager(SourceManager):
|
|||
async def create(self, file_path: str, key: Optional[bytes] = None,
|
||||
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
||||
descriptor = await StreamDescriptor.create_stream(
|
||||
self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
|
||||
self.loop, self.blob_manager, file_path, key=key, iv_generator=iv_generator,
|
||||
blob_completed_callback=self.blob_manager.blob_completed
|
||||
)
|
||||
await self.storage.store_stream(
|
||||
|
|
|
@ -199,7 +199,8 @@ class WalletNode:
|
|||
cleanup and self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
shutil.rmtree(self.data_path, ignore_errors=True)
|
||||
log.warning("skipping cleanup of data_path: %s", self.data_path)
|
||||
#shutil.rmtree(self.data_path, ignore_errors=True)
|
||||
|
||||
|
||||
class SPVNode:
|
||||
|
|
|
@ -636,7 +636,7 @@ class DiskSpaceManagement(CommandTestCase):
|
|||
class TestBackgroundDownloaderComponent(CommandTestCase):
|
||||
async def get_blobs_from_sd_blob(self, sd_blob):
|
||||
descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
||||
asyncio.get_running_loop(), self.daemon.blob_manager.blob_dir, sd_blob
|
||||
asyncio.get_running_loop(), self.daemon.blob_manager, sd_blob
|
||||
)
|
||||
return descriptor.blobs
|
||||
|
||||
|
|
|
@ -31,10 +31,10 @@ class RangeRequests(CommandTestCase):
|
|||
self.data = data
|
||||
await self.stream_create('foo', '0.01', data=self.data, file_size=file_size)
|
||||
if save_blobs:
|
||||
self.assertGreater(len(os.listdir(self.daemon.blob_manager.blob_dir)), 1)
|
||||
self.assertGreater(len(self.daemon.blob_manager.list_blobs()), 1)
|
||||
await (await self.daemon.jsonrpc_file_list())['items'][0].fully_reflected.wait()
|
||||
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo')
|
||||
self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||
self.assertEqual(0, len(self.daemon.blob_manager.list_blobs()))
|
||||
# await self._restart_stream_manager()
|
||||
await self.daemon.streaming_runner.setup()
|
||||
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
|
||||
|
@ -313,20 +313,20 @@ class RangeRequests(CommandTestCase):
|
|||
self.daemon.conf.save_blobs = True
|
||||
blobs_in_stream = (await self.daemon.jsonrpc_file_list())['items'][0].blobs_in_stream
|
||||
sd_hash = (await self.daemon.jsonrpc_file_list())['items'][0].sd_hash
|
||||
start_file_count = len(os.listdir(self.daemon.blob_manager.blob_dir))
|
||||
start_file_count = len(self.daemon.blob_manager.list_blobs())
|
||||
await self._test_range_requests()
|
||||
self.assertEqual(start_file_count + blobs_in_stream, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||
self.assertEqual(start_file_count + blobs_in_stream, len(self.daemon.blob_manager.list_blobs()))
|
||||
self.assertEqual(0, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining)
|
||||
|
||||
# switch back
|
||||
self.daemon.conf.save_blobs = False
|
||||
await self._test_range_requests()
|
||||
self.assertEqual(start_file_count + blobs_in_stream, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||
self.assertEqual(start_file_count + blobs_in_stream, len(self.daemon.blob_manager.list_blobs()))
|
||||
self.assertEqual(0, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining)
|
||||
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, sd_hash=sd_hash)
|
||||
self.assertEqual(start_file_count, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||
self.assertEqual(start_file_count, len(self.daemon.blob_manager.list_blobs()))
|
||||
await self._test_range_requests()
|
||||
self.assertEqual(start_file_count, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||
self.assertEqual(start_file_count, len(self.daemon.blob_manager.list_blobs()))
|
||||
self.assertEqual(blobs_in_stream, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining)
|
||||
|
||||
async def test_file_save_streaming_only_save_blobs(self):
|
||||
|
@ -400,7 +400,7 @@ class RangeRequestsLRUCache(CommandTestCase):
|
|||
await self.stream_create('foo', '0.01', data=self.data, file_size=0)
|
||||
await (await self.daemon.jsonrpc_file_list())['items'][0].fully_reflected.wait()
|
||||
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo')
|
||||
self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||
self.assertEqual(0, len(self.daemon.blob_manager.list_blobs()))
|
||||
|
||||
await self.daemon.streaming_runner.setup()
|
||||
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
|
||||
|
|
Loading…
Add table
Reference in a new issue