forked from LBRYCommunity/lbry-sdk
non async blob_completed callback, tests
This commit is contained in:
parent
1375153fda
commit
46b166952b
16 changed files with 185 additions and 117 deletions
|
@ -73,7 +73,7 @@ class AbstractBlob:
|
|||
]
|
||||
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], typing.Awaitable]] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
|
||||
blob_directory: typing.Optional[str] = None):
|
||||
self.loop = loop
|
||||
self.blob_hash = blob_hash
|
||||
|
@ -170,15 +170,17 @@ class AbstractBlob:
|
|||
return decrypt_blob_bytes(reader, self.length, key, iv)
|
||||
|
||||
@classmethod
|
||||
async def create_from_unencrypted(cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes,
|
||||
iv: bytes, unencrypted: bytes, blob_num: int) -> BlobInfo:
|
||||
async def create_from_unencrypted(
|
||||
cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
||||
unencrypted: bytes, blob_num: int,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo:
|
||||
"""
|
||||
Create an encrypted BlobFile from plaintext bytes
|
||||
"""
|
||||
|
||||
blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
|
||||
length = len(blob_bytes)
|
||||
blob = cls(loop, blob_hash, length, blob_directory=blob_dir)
|
||||
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir)
|
||||
writer = blob.get_blob_writer()
|
||||
writer.write(blob_bytes)
|
||||
await blob.verified.wait()
|
||||
|
@ -191,7 +193,7 @@ class AbstractBlob:
|
|||
self._write_blob(verified_bytes)
|
||||
self.verified.set()
|
||||
if self.blob_completed_callback:
|
||||
self.loop.create_task(self.blob_completed_callback(self))
|
||||
self.blob_completed_callback(self)
|
||||
|
||||
def get_blob_writer(self) -> HashBlobWriter:
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
|
@ -217,7 +219,6 @@ class AbstractBlob:
|
|||
finally:
|
||||
if writer in self.writers:
|
||||
self.writers.remove(writer)
|
||||
|
||||
fut.add_done_callback(writer_finished_callback)
|
||||
return writer
|
||||
|
||||
|
@ -227,7 +228,7 @@ class BlobBuffer(AbstractBlob):
|
|||
An in-memory only blob
|
||||
"""
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], typing.Awaitable]] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
|
||||
blob_directory: typing.Optional[str] = None):
|
||||
self._verified_bytes: typing.Optional[BytesIO] = None
|
||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
|
||||
|
@ -265,11 +266,11 @@ class BlobFile(AbstractBlob):
|
|||
A blob existing on the local file system
|
||||
"""
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], typing.Awaitable]] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
|
||||
blob_directory: typing.Optional[str] = None):
|
||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
|
||||
if not blob_directory or not os.path.isdir(blob_directory):
|
||||
raise OSError(f"invalid blob directory '{blob_directory}'")
|
||||
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
|
||||
self.file_path = os.path.join(self.blob_directory, self.blob_hash)
|
||||
if self.file_exists:
|
||||
file_size = int(os.stat(self.file_path).st_size)
|
||||
|
@ -310,8 +311,12 @@ class BlobFile(AbstractBlob):
|
|||
return super().delete()
|
||||
|
||||
@classmethod
|
||||
async def create_from_unencrypted(cls, loop: asyncio.BaseEventLoop, blob_dir: str, key: bytes,
|
||||
iv: bytes, unencrypted: bytes, blob_num: int) -> BlobInfo:
|
||||
async def create_from_unencrypted(
|
||||
cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
||||
unencrypted: bytes, blob_num: int,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo:
|
||||
if not blob_dir or not os.path.isdir(blob_dir):
|
||||
raise OSError(f"cannot create blob in directory: '{blob_dir}'")
|
||||
return await super().create_from_unencrypted(loop, blob_dir, key, iv, unencrypted, blob_num)
|
||||
return await super().create_from_unencrypted(
|
||||
loop, blob_dir, key, iv, unencrypted, blob_num, blob_completed_callback
|
||||
)
|
||||
|
|
|
@ -6,6 +6,7 @@ from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, Abst
|
|||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.dht.protocol.data_store import DictDataStore
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
|
||||
|
@ -13,8 +14,8 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class BlobManager:
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: 'SQLiteStorage',
|
||||
node_data_store: typing.Optional['DictDataStore'] = None, save_blobs: bool = True):
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config',
|
||||
node_data_store: typing.Optional['DictDataStore'] = None):
|
||||
"""
|
||||
This class stores blobs on the hard disk
|
||||
|
||||
|
@ -28,12 +29,29 @@ class BlobManager:
|
|||
self.completed_blob_hashes: typing.Set[str] = set() if not self._node_data_store\
|
||||
else self._node_data_store.completed_blobs
|
||||
self.blobs: typing.Dict[str, AbstractBlob] = {}
|
||||
self._save_blobs = save_blobs
|
||||
self.config = config
|
||||
|
||||
def get_blob_class(self):
|
||||
if not self._save_blobs:
|
||||
return BlobBuffer
|
||||
return BlobFile
|
||||
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
|
||||
if self.config.save_blobs:
|
||||
return BlobFile(
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir
|
||||
)
|
||||
else:
|
||||
if length and is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
|
||||
return BlobFile(
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir
|
||||
)
|
||||
return BlobBuffer(
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir
|
||||
)
|
||||
|
||||
def get_blob(self, blob_hash, length: typing.Optional[int] = None):
|
||||
if blob_hash in self.blobs:
|
||||
if length and self.blobs[blob_hash].length is None:
|
||||
self.blobs[blob_hash].set_length(length)
|
||||
else:
|
||||
self.blobs[blob_hash] = self._get_blob(blob_hash, length)
|
||||
return self.blobs[blob_hash]
|
||||
|
||||
async def setup(self) -> bool:
|
||||
def get_files_in_blob_dir() -> typing.Set[str]:
|
||||
|
@ -54,28 +72,22 @@ class BlobManager:
|
|||
blob.close()
|
||||
self.completed_blob_hashes.clear()
|
||||
|
||||
def get_blob(self, blob_hash, length: typing.Optional[int] = None):
|
||||
if blob_hash in self.blobs:
|
||||
if length and self.blobs[blob_hash].length is None:
|
||||
self.blobs[blob_hash].set_length(length)
|
||||
else:
|
||||
self.blobs[blob_hash] = self.get_blob_class()(self.loop, blob_hash, length, self.blob_completed,
|
||||
self.blob_dir)
|
||||
return self.blobs[blob_hash]
|
||||
|
||||
def get_stream_descriptor(self, sd_hash):
|
||||
return StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_dir, self.get_blob(sd_hash))
|
||||
|
||||
async def blob_completed(self, blob: AbstractBlob):
|
||||
def blob_completed(self, blob: AbstractBlob):
|
||||
if blob.blob_hash is None:
|
||||
raise Exception("Blob hash is None")
|
||||
if not blob.length:
|
||||
raise Exception("Blob has a length of 0")
|
||||
if isinstance(blob, BlobBuffer): # don't save blob buffers to the db / dont announce them
|
||||
return
|
||||
if blob.blob_hash not in self.completed_blob_hashes:
|
||||
self.completed_blob_hashes.add(blob.blob_hash)
|
||||
await self.storage.add_completed_blob(blob.blob_hash, blob.length)
|
||||
if not blob.get_is_verified():
|
||||
raise Exception("Blob is not verified")
|
||||
if isinstance(blob, BlobFile):
|
||||
if blob.blob_hash not in self.completed_blob_hashes:
|
||||
self.completed_blob_hashes.add(blob.blob_hash)
|
||||
self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=True))
|
||||
else:
|
||||
self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=False))
|
||||
|
||||
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
||||
"""Returns of the blobhashes_to_check, which are valid"""
|
||||
|
|
|
@ -294,7 +294,7 @@ class BlobComponent(Component):
|
|||
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
|
||||
if not os.path.isdir(blob_dir):
|
||||
os.mkdir(blob_dir)
|
||||
self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, data_store, self.conf.save_blobs)
|
||||
self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, self.conf, data_store)
|
||||
return await self.blob_manager.setup()
|
||||
|
||||
async def stop(self):
|
||||
|
@ -487,7 +487,7 @@ class UPnPComponent(Component):
|
|||
while True:
|
||||
if now:
|
||||
await self._maintain_redirects()
|
||||
await asyncio.sleep(360)
|
||||
await asyncio.sleep(360, loop=self.component_manager.loop)
|
||||
|
||||
async def _maintain_redirects(self):
|
||||
# setup the gateway if necessary
|
||||
|
|
|
@ -300,27 +300,28 @@ class SQLiteStorage(SQLiteMixin):
|
|||
|
||||
# # # # # # # # # blob functions # # # # # # # # #
|
||||
|
||||
def add_completed_blob(self, blob_hash: str, length: int):
|
||||
def _add_blob(transaction: sqlite3.Connection):
|
||||
transaction.execute(
|
||||
async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int], finished=False):
|
||||
def _add_blobs(transaction: sqlite3.Connection):
|
||||
transaction.executemany(
|
||||
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
|
||||
(blob_hash, length, 0, 0, "pending", 0, 0)
|
||||
[
|
||||
(blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0)
|
||||
for blob_hash, length in blob_hashes_and_lengths
|
||||
]
|
||||
)
|
||||
transaction.execute(
|
||||
"update blob set status='finished' where blob.blob_hash=?", (blob_hash, )
|
||||
)
|
||||
return self.db.run(_add_blob)
|
||||
if finished:
|
||||
transaction.executemany(
|
||||
"update blob set status='finished' where blob.blob_hash=?", [
|
||||
(blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths
|
||||
]
|
||||
)
|
||||
return await self.db.run(_add_blobs)
|
||||
|
||||
def get_blob_status(self, blob_hash: str):
|
||||
return self.run_and_return_one_or_none(
|
||||
"select status from blob where blob_hash=?", blob_hash
|
||||
)
|
||||
|
||||
def add_known_blob(self, blob_hash: str, length: int):
|
||||
return self.db.execute(
|
||||
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", (blob_hash, length, 0, 0, "pending", 0, 0)
|
||||
)
|
||||
|
||||
def should_announce(self, blob_hash: str):
|
||||
return self.run_and_return_one_or_none(
|
||||
"select should_announce from blob where blob_hash=?", blob_hash
|
||||
|
|
|
@ -109,13 +109,14 @@ class StreamDescriptor:
|
|||
return h.hexdigest()
|
||||
|
||||
async def make_sd_blob(self, blob_file_obj: typing.Optional[AbstractBlob] = None,
|
||||
old_sort: typing.Optional[bool] = False):
|
||||
old_sort: typing.Optional[bool] = False,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None):
|
||||
sd_hash = self.calculate_sd_hash() if not old_sort else self.calculate_old_sort_sd_hash()
|
||||
if not old_sort:
|
||||
sd_data = self.as_json()
|
||||
else:
|
||||
sd_data = self.old_sort_json()
|
||||
sd_blob = blob_file_obj or BlobFile(self.loop, sd_hash, len(sd_data), blob_directory=self.blob_dir)
|
||||
sd_blob = blob_file_obj or BlobFile(self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir)
|
||||
if blob_file_obj:
|
||||
blob_file_obj.set_length(len(sd_data))
|
||||
if not sd_blob.get_is_verified():
|
||||
|
@ -194,11 +195,12 @@ class StreamDescriptor:
|
|||
return h.hexdigest()
|
||||
|
||||
@classmethod
|
||||
async def create_stream(cls, loop: asyncio.BaseEventLoop, blob_dir: str,
|
||||
file_path: str, key: typing.Optional[bytes] = None,
|
||||
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None,
|
||||
old_sort: bool = False) -> 'StreamDescriptor':
|
||||
|
||||
async def create_stream(
|
||||
cls, loop: asyncio.BaseEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None,
|
||||
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None,
|
||||
old_sort: bool = False,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'],
|
||||
None]] = None) -> 'StreamDescriptor':
|
||||
blobs: typing.List[BlobInfo] = []
|
||||
|
||||
iv_generator = iv_generator or random_iv_generator()
|
||||
|
@ -207,7 +209,7 @@ class StreamDescriptor:
|
|||
for blob_bytes in file_reader(file_path):
|
||||
blob_num += 1
|
||||
blob_info = await BlobFile.create_from_unencrypted(
|
||||
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num
|
||||
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback
|
||||
)
|
||||
blobs.append(blob_info)
|
||||
blobs.append(
|
||||
|
@ -216,7 +218,7 @@ class StreamDescriptor:
|
|||
loop, blob_dir, os.path.basename(file_path), binascii.hexlify(key).decode(), os.path.basename(file_path),
|
||||
blobs
|
||||
)
|
||||
sd_blob = await descriptor.make_sd_blob(old_sort=old_sort)
|
||||
sd_blob = await descriptor.make_sd_blob(old_sort=old_sort, blob_completed_callback=blob_completed_callback)
|
||||
descriptor.sd_hash = sd_blob.blob_hash
|
||||
return descriptor
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ from lbrynet.stream.downloader import StreamDownloader
|
|||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
from lbrynet.stream.reflector.client import StreamReflectorClient
|
||||
from lbrynet.extras.daemon.storage import StoredStreamClaim
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.schema.claim import Claim
|
||||
|
@ -207,15 +208,12 @@ class ManagedStream:
|
|||
file_path: str, key: typing.Optional[bytes] = None,
|
||||
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream':
|
||||
descriptor = await StreamDescriptor.create_stream(
|
||||
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator
|
||||
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
|
||||
blob_completed_callback=blob_manager.blob_completed
|
||||
)
|
||||
sd_blob = blob_manager.get_blob(descriptor.sd_hash)
|
||||
await blob_manager.storage.store_stream(
|
||||
blob_manager.get_blob(descriptor.sd_hash), descriptor
|
||||
)
|
||||
await blob_manager.blob_completed(sd_blob)
|
||||
for blob in descriptor.blobs[:-1]:
|
||||
await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length))
|
||||
row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path),
|
||||
os.path.dirname(file_path), 0)
|
||||
return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
|
||||
|
|
|
@ -107,9 +107,11 @@ class CommandTestCase(IntegrationTestCase):
|
|||
|
||||
server_tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(shutil.rmtree, server_tmp_dir)
|
||||
self.server_storage = SQLiteStorage(Config(), ':memory:')
|
||||
self.server_config = Config()
|
||||
self.server_storage = SQLiteStorage(self.server_config, ':memory:')
|
||||
await self.server_storage.open()
|
||||
self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage)
|
||||
|
||||
self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage, self.server_config)
|
||||
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
|
||||
self.server.start_server(5567, '127.0.0.1')
|
||||
await self.server.started_listening.wait()
|
||||
|
|
|
@ -142,7 +142,7 @@ class FileCommands(CommandTestCase):
|
|||
os.rename(missing_blob.file_path + '__', missing_blob.file_path)
|
||||
self.server_blob_manager.blobs.clear()
|
||||
missing_blob = self.server_blob_manager.get_blob(missing_blob_hash)
|
||||
await self.server_blob_manager.blob_completed(missing_blob)
|
||||
self.server_blob_manager.blob_completed(missing_blob)
|
||||
await asyncio.wait_for(self.wait_files_to_complete(), timeout=1)
|
||||
|
||||
async def test_paid_download(self):
|
||||
|
|
|
@ -15,10 +15,17 @@ class TestBlob(AsyncioTestCase):
|
|||
blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
|
||||
|
||||
async def asyncSetUp(self):
|
||||
self.tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(self.tmp_dir))
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.config = Config()
|
||||
self.storage = SQLiteStorage(self.config, ":memory:", self.loop)
|
||||
self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage, self.config)
|
||||
await self.storage.open()
|
||||
|
||||
def _get_blob(self, blob_class=AbstractBlob, blob_directory=None):
|
||||
blob = blob_class(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=blob_directory)
|
||||
blob = blob_class(self.loop, self.blob_hash, len(self.blob_bytes), self.blob_manager.blob_completed,
|
||||
blob_directory=blob_directory)
|
||||
self.assertFalse(blob.get_is_verified())
|
||||
self.addCleanup(blob.close)
|
||||
return blob
|
||||
|
@ -29,6 +36,7 @@ class TestBlob(AsyncioTestCase):
|
|||
writer.write(self.blob_bytes)
|
||||
await blob.verified.wait()
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
await asyncio.sleep(0, loop=self.loop) # wait for the db save task
|
||||
return blob
|
||||
|
||||
async def _test_close_writers_on_finished(self, blob_class=AbstractBlob, blob_directory=None):
|
||||
|
@ -54,25 +62,39 @@ class TestBlob(AsyncioTestCase):
|
|||
other.write(self.blob_bytes)
|
||||
|
||||
def _test_ioerror_if_length_not_set(self, blob_class=AbstractBlob, blob_directory=None):
|
||||
blob = blob_class(self.loop, self.blob_hash, blob_directory=blob_directory)
|
||||
blob = blob_class(
|
||||
self.loop, self.blob_hash, blob_completed_callback=self.blob_manager.blob_completed,
|
||||
blob_directory=blob_directory
|
||||
)
|
||||
self.addCleanup(blob.close)
|
||||
writer = blob.get_blob_writer()
|
||||
with self.assertRaises(IOError):
|
||||
writer.write(b'')
|
||||
|
||||
async def _test_invalid_blob_bytes(self, blob_class=AbstractBlob, blob_directory=None):
|
||||
blob = blob_class(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=blob_directory)
|
||||
blob = blob_class(
|
||||
self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed,
|
||||
blob_directory=blob_directory
|
||||
)
|
||||
self.addCleanup(blob.close)
|
||||
writer = blob.get_blob_writer()
|
||||
writer.write(self.blob_bytes[:-4] + b'fake')
|
||||
with self.assertRaises(InvalidBlobHashError):
|
||||
await writer.finished
|
||||
|
||||
async def test_add_blob_buffer_to_db(self):
|
||||
blob = await self._test_create_blob(BlobBuffer)
|
||||
db_status = await self.storage.get_blob_status(blob.blob_hash)
|
||||
self.assertEqual(db_status, 'pending')
|
||||
|
||||
async def test_add_blob_file_to_db(self):
|
||||
blob = await self._test_create_blob(BlobFile, self.tmp_dir)
|
||||
db_status = await self.storage.get_blob_status(blob.blob_hash)
|
||||
self.assertEqual(db_status, 'finished')
|
||||
|
||||
async def test_invalid_blob_bytes(self):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
await self._test_invalid_blob_bytes(BlobBuffer)
|
||||
await self._test_invalid_blob_bytes(BlobFile, tmp_dir)
|
||||
await self._test_invalid_blob_bytes(BlobFile, self.tmp_dir)
|
||||
|
||||
def test_ioerror_if_length_not_set(self):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
|
@ -113,6 +135,7 @@ class TestBlob(AsyncioTestCase):
|
|||
|
||||
async def test_delete(self):
|
||||
blob_buffer = await self._test_create_blob(BlobBuffer)
|
||||
self.assertIsInstance(blob_buffer, BlobBuffer)
|
||||
self.assertIsNotNone(blob_buffer._verified_bytes)
|
||||
self.assertTrue(blob_buffer.get_is_verified())
|
||||
blob_buffer.delete()
|
||||
|
@ -123,6 +146,7 @@ class TestBlob(AsyncioTestCase):
|
|||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
|
||||
blob_file = await self._test_create_blob(BlobFile, tmp_dir)
|
||||
self.assertIsInstance(blob_file, BlobFile)
|
||||
self.assertTrue(os.path.isfile(blob_file.file_path))
|
||||
self.assertTrue(blob_file.get_is_verified())
|
||||
blob_file.delete()
|
||||
|
@ -132,17 +156,26 @@ class TestBlob(AsyncioTestCase):
|
|||
async def test_delete_corrupt(self):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
blob = BlobFile(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=tmp_dir)
|
||||
blob = BlobFile(
|
||||
self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed,
|
||||
blob_directory=tmp_dir
|
||||
)
|
||||
writer = blob.get_blob_writer()
|
||||
writer.write(self.blob_bytes)
|
||||
await blob.verified.wait()
|
||||
blob.close()
|
||||
blob = BlobFile(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=tmp_dir)
|
||||
blob = BlobFile(
|
||||
self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed,
|
||||
blob_directory=tmp_dir
|
||||
)
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
|
||||
with open(blob.file_path, 'wb+') as f:
|
||||
f.write(b'\x00')
|
||||
blob = BlobFile(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=tmp_dir)
|
||||
blob = BlobFile(
|
||||
self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed,
|
||||
blob_directory=tmp_dir
|
||||
)
|
||||
self.assertFalse(blob.get_is_verified())
|
||||
self.assertFalse(os.path.isfile(blob.file_path))
|
||||
|
||||
|
|
|
@ -9,46 +9,50 @@ from lbrynet.blob.blob_manager import BlobManager
|
|||
|
||||
|
||||
class TestBlobManager(AsyncioTestCase):
|
||||
async def test_sync_blob_manager_on_startup(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
async def setup_blob_manager(self, save_blobs=True):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
self.config = Config(save_blobs=save_blobs)
|
||||
self.storage = SQLiteStorage(self.config, os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config)
|
||||
await self.storage.open()
|
||||
|
||||
storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||
blob_manager = BlobManager(loop, tmp_dir, storage)
|
||||
async def test_sync_blob_file_manager_on_startup(self):
|
||||
await self.setup_blob_manager(save_blobs=True)
|
||||
|
||||
# add a blob file
|
||||
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"
|
||||
blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
|
||||
with open(os.path.join(blob_manager.blob_dir, blob_hash), 'wb') as f:
|
||||
with open(os.path.join(self.blob_manager.blob_dir, blob_hash), 'wb') as f:
|
||||
f.write(blob_bytes)
|
||||
|
||||
# it should not have been added automatically on startup
|
||||
await storage.open()
|
||||
await blob_manager.setup()
|
||||
self.assertSetEqual(blob_manager.completed_blob_hashes, set())
|
||||
|
||||
await self.blob_manager.setup()
|
||||
self.assertSetEqual(self.blob_manager.completed_blob_hashes, set())
|
||||
|
||||
# make sure we can add the blob
|
||||
await blob_manager.blob_completed(blob_manager.get_blob(blob_hash, len(blob_bytes)))
|
||||
self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash})
|
||||
self.blob_manager.blob_completed(self.blob_manager.get_blob(blob_hash, len(blob_bytes)))
|
||||
await self.blob_manager.storage.add_blobs((blob_hash, len(blob_bytes)), finished=True)
|
||||
self.assertSetEqual(self.blob_manager.completed_blob_hashes, {blob_hash})
|
||||
|
||||
# stop the blob manager and restart it, make sure the blob is there
|
||||
blob_manager.stop()
|
||||
self.assertSetEqual(blob_manager.completed_blob_hashes, set())
|
||||
await blob_manager.setup()
|
||||
self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash})
|
||||
self.blob_manager.stop()
|
||||
self.assertSetEqual(self.blob_manager.completed_blob_hashes, set())
|
||||
await self.blob_manager.setup()
|
||||
self.assertSetEqual(self.blob_manager.completed_blob_hashes, {blob_hash})
|
||||
|
||||
# test that the blob is removed upon the next startup after the file being manually deleted
|
||||
blob_manager.stop()
|
||||
self.blob_manager.stop()
|
||||
|
||||
# manually delete the blob file and restart the blob manager
|
||||
os.remove(os.path.join(blob_manager.blob_dir, blob_hash))
|
||||
await blob_manager.setup()
|
||||
self.assertSetEqual(blob_manager.completed_blob_hashes, set())
|
||||
os.remove(os.path.join(self.blob_manager.blob_dir, blob_hash))
|
||||
await self.blob_manager.setup()
|
||||
self.assertSetEqual(self.blob_manager.completed_blob_hashes, set())
|
||||
|
||||
# check that the deleted blob was updated in the database
|
||||
self.assertEqual(
|
||||
'pending', (
|
||||
await storage.run_and_return_one_or_none('select status from blob where blob_hash=?', blob_hash)
|
||||
await self.storage.run_and_return_one_or_none('select status from blob where blob_hash=?', blob_hash)
|
||||
)
|
||||
)
|
||||
|
|
|
@ -35,13 +35,13 @@ class BlobExchangeTestBase(AsyncioTestCase):
|
|||
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
|
||||
reflector_servers=[])
|
||||
self.server_storage = SQLiteStorage(self.server_config, os.path.join(self.server_dir, "lbrynet.sqlite"))
|
||||
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage)
|
||||
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config)
|
||||
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
|
||||
|
||||
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
|
||||
reflector_servers=[])
|
||||
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
|
||||
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage)
|
||||
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
|
||||
self.client_peer_manager = PeerManager(self.loop)
|
||||
self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
|
||||
|
||||
|
@ -64,6 +64,7 @@ class TestBlobExchange(BlobExchangeTestBase):
|
|||
await server_blob.verified.wait()
|
||||
self.assertTrue(os.path.isfile(server_blob.file_path))
|
||||
self.assertEqual(server_blob.get_is_verified(), True)
|
||||
self.assertTrue(writer.closed())
|
||||
|
||||
async def _test_transfer_blob(self, blob_hash: str):
|
||||
client_blob = self.client_blob_manager.get_blob(blob_hash)
|
||||
|
@ -76,7 +77,7 @@ class TestBlobExchange(BlobExchangeTestBase):
|
|||
await client_blob.verified.wait()
|
||||
self.assertEqual(client_blob.get_is_verified(), True)
|
||||
self.assertTrue(downloaded)
|
||||
self.addCleanup(client_blob.close)
|
||||
client_blob.close()
|
||||
|
||||
async def test_transfer_sd_blob(self):
|
||||
sd_hash = "3e2706157a59aaa47ef52bc264fce488078b4026c0b9bab649a8f2fe1ecc5e5cad7182a2bb7722460f856831a1ac0f02"
|
||||
|
@ -96,9 +97,11 @@ class TestBlobExchange(BlobExchangeTestBase):
|
|||
|
||||
second_client_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(shutil.rmtree, second_client_dir)
|
||||
|
||||
second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite"))
|
||||
second_client_blob_manager = BlobManager(self.loop, second_client_dir, second_client_storage)
|
||||
second_client_conf = Config()
|
||||
second_client_storage = SQLiteStorage(second_client_conf, os.path.join(second_client_dir, "lbrynet.sqlite"))
|
||||
second_client_blob_manager = BlobManager(
|
||||
self.loop, second_client_dir, second_client_storage, second_client_conf
|
||||
)
|
||||
server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
|
||||
|
||||
await second_client_storage.open()
|
||||
|
@ -128,9 +131,12 @@ class TestBlobExchange(BlobExchangeTestBase):
|
|||
|
||||
second_client_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(shutil.rmtree, second_client_dir)
|
||||
second_client_conf = Config()
|
||||
|
||||
second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite"))
|
||||
second_client_blob_manager = BlobManager(self.loop, second_client_dir, second_client_storage)
|
||||
second_client_storage = SQLiteStorage(second_client_conf, os.path.join(second_client_dir, "lbrynet.sqlite"))
|
||||
second_client_blob_manager = BlobManager(
|
||||
self.loop, second_client_dir, second_client_storage, second_client_conf
|
||||
)
|
||||
server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
|
||||
|
||||
await second_client_storage.open()
|
||||
|
|
|
@ -68,17 +68,18 @@ fake_claim_info = {
|
|||
|
||||
class StorageTest(AsyncioTestCase):
|
||||
async def asyncSetUp(self):
|
||||
self.storage = SQLiteStorage(Config(), ':memory:')
|
||||
self.conf = Config()
|
||||
self.storage = SQLiteStorage(self.conf, ':memory:')
|
||||
self.blob_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(shutil.rmtree, self.blob_dir)
|
||||
self.blob_manager = BlobManager(asyncio.get_event_loop(), self.blob_dir, self.storage)
|
||||
self.blob_manager = BlobManager(asyncio.get_event_loop(), self.blob_dir, self.storage, self.conf)
|
||||
await self.storage.open()
|
||||
|
||||
async def asyncTearDown(self):
|
||||
await self.storage.close()
|
||||
|
||||
async def store_fake_blob(self, blob_hash, length=100):
|
||||
await self.storage.add_completed_blob(blob_hash, length)
|
||||
await self.storage.add_blobs((blob_hash, length), finished=True)
|
||||
|
||||
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
|
||||
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())]
|
||||
|
|
|
@ -78,8 +78,7 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
|||
blob2 = binascii.hexlify(b'2' * 48).decode()
|
||||
|
||||
async with self._test_network_context():
|
||||
await self.storage.add_completed_blob(blob1, 1024)
|
||||
await self.storage.add_completed_blob(blob2, 1024)
|
||||
await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True)
|
||||
await self.storage.db.execute(
|
||||
"update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)",
|
||||
(blob1, blob2)
|
||||
|
|
|
@ -36,8 +36,9 @@ class TestManagedStream(BlobExchangeTestBase):
|
|||
|
||||
async def setup_stream(self, blob_count: int = 10):
|
||||
await self.create_stream(blob_count)
|
||||
self.stream = ManagedStream(self.loop, self.client_config, self.client_blob_manager, self.sd_hash,
|
||||
self.client_dir)
|
||||
self.stream = ManagedStream(
|
||||
self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir
|
||||
)
|
||||
|
||||
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None):
|
||||
await self.setup_stream(blob_count)
|
||||
|
|
|
@ -18,16 +18,18 @@ class TestStreamAssembler(AsyncioTestCase):
|
|||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||
self.conf = Config()
|
||||
self.storage = SQLiteStorage(self.conf, os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||
await self.storage.open()
|
||||
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage)
|
||||
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.conf)
|
||||
self.stream_manager = StreamManager(self.loop, Config(), self.blob_manager, None, self.storage, None)
|
||||
|
||||
server_tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(server_tmp_dir))
|
||||
self.server_storage = SQLiteStorage(Config(), os.path.join(server_tmp_dir, "lbrynet.sqlite"))
|
||||
self.server_conf = Config()
|
||||
self.server_storage = SQLiteStorage(self.server_conf, os.path.join(server_tmp_dir, "lbrynet.sqlite"))
|
||||
await self.server_storage.open()
|
||||
self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage)
|
||||
self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage, self.server_conf)
|
||||
|
||||
download_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(download_dir))
|
||||
|
|
|
@ -20,9 +20,10 @@ class TestStreamDescriptor(AsyncioTestCase):
|
|||
self.cleartext = os.urandom(20000000)
|
||||
self.tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(self.tmp_dir))
|
||||
self.storage = SQLiteStorage(Config(), ":memory:")
|
||||
self.conf = Config()
|
||||
self.storage = SQLiteStorage(self.conf, ":memory:")
|
||||
await self.storage.open()
|
||||
self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage)
|
||||
self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage, self.conf)
|
||||
|
||||
self.file_path = os.path.join(self.tmp_dir, "test_file")
|
||||
with open(self.file_path, 'wb') as f:
|
||||
|
@ -83,9 +84,10 @@ class TestRecoverOldStreamDescriptors(AsyncioTestCase):
|
|||
loop = asyncio.get_event_loop()
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
storage = SQLiteStorage(Config(), ":memory:")
|
||||
self.conf = Config()
|
||||
storage = SQLiteStorage(self.conf, ":memory:")
|
||||
await storage.open()
|
||||
blob_manager = BlobManager(loop, tmp_dir, storage)
|
||||
blob_manager = BlobManager(loop, tmp_dir, storage, self.conf)
|
||||
|
||||
sd_bytes = b'{"stream_name": "4f62616d6120446f6e6b65792d322e73746c", "blobs": [{"length": 1153488, "blob_num' \
|
||||
b'": 0, "blob_hash": "9fa32a249ce3f2d4e46b78599800f368b72f2a7f22b81df443c7f6bdbef496bd61b4c0079c7' \
|
||||
|
|
Loading…
Reference in a new issue