diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 445a8f5da..a566652e2 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -73,7 +73,7 @@ class AbstractBlob: ] def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], typing.Awaitable]] = None, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None, blob_directory: typing.Optional[str] = None): self.loop = loop self.blob_hash = blob_hash @@ -170,15 +170,17 @@ class AbstractBlob: return decrypt_blob_bytes(reader, self.length, key, iv) @classmethod - async def create_from_unencrypted(cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, - iv: bytes, unencrypted: bytes, blob_num: int) -> BlobInfo: + async def create_from_unencrypted( + cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + unencrypted: bytes, blob_num: int, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo: """ Create an encrypted BlobFile from plaintext bytes """ blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted) length = len(blob_bytes) - blob = cls(loop, blob_hash, length, blob_directory=blob_dir) + blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir) writer = blob.get_blob_writer() writer.write(blob_bytes) await blob.verified.wait() @@ -191,7 +193,7 @@ class AbstractBlob: self._write_blob(verified_bytes) self.verified.set() if self.blob_completed_callback: - self.loop.create_task(self.blob_completed_callback(self)) + self.blob_completed_callback(self) def get_blob_writer(self) -> HashBlobWriter: fut = asyncio.Future(loop=self.loop) @@ -217,7 +219,6 @@ class AbstractBlob: finally: if writer in self.writers: self.writers.remove(writer) - fut.add_done_callback(writer_finished_callback) return writer @@ -227,7 +228,7 @@ class BlobBuffer(AbstractBlob): An in-memory only blob """ def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], typing.Awaitable]] = None, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None, blob_directory: typing.Optional[str] = None): self._verified_bytes: typing.Optional[BytesIO] = None super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) @@ -265,11 +266,11 @@ class BlobFile(AbstractBlob): A blob existing on the local file system """ def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], typing.Awaitable]] = None, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None, blob_directory: typing.Optional[str] = None): + super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) if not blob_directory or not os.path.isdir(blob_directory): raise OSError(f"invalid blob directory '{blob_directory}'") - super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) self.file_path = os.path.join(self.blob_directory, self.blob_hash) if self.file_exists: file_size = int(os.stat(self.file_path).st_size) @@ -310,8 +311,12 @@ class BlobFile(AbstractBlob): return super().delete() @classmethod - async def create_from_unencrypted(cls, loop: asyncio.BaseEventLoop, blob_dir: str, key: bytes, - iv: bytes, unencrypted: bytes, blob_num: int) -> BlobInfo: + async def create_from_unencrypted( + cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + unencrypted: bytes, blob_num: int, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo: if not blob_dir or not os.path.isdir(blob_dir): raise OSError(f"cannot create blob in directory: '{blob_dir}'") - return await super().create_from_unencrypted(loop, blob_dir, key, iv, unencrypted, blob_num) + return await super().create_from_unencrypted( + loop, blob_dir, key, iv, unencrypted, blob_num, blob_completed_callback + ) diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index c45ec5c58..c86b552e1 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -6,6 +6,7 @@ from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, Abst from lbrynet.stream.descriptor import StreamDescriptor if typing.TYPE_CHECKING: + from lbrynet.conf import Config from lbrynet.dht.protocol.data_store import DictDataStore from lbrynet.extras.daemon.storage import SQLiteStorage @@ -13,8 +14,8 @@ log = logging.getLogger(__name__) class BlobManager: - def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: 'SQLiteStorage', - node_data_store: typing.Optional['DictDataStore'] = None, save_blobs: bool = True): + def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config', + node_data_store: typing.Optional['DictDataStore'] = None): """ This class stores blobs on the hard disk @@ -28,12 +29,29 @@ class BlobManager: self.completed_blob_hashes: typing.Set[str] = set() if not self._node_data_store\ else self._node_data_store.completed_blobs self.blobs: typing.Dict[str, AbstractBlob] = {} - self._save_blobs = save_blobs + self.config = config - def get_blob_class(self): - if not self._save_blobs: - return BlobBuffer - return BlobFile + def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None): + if self.config.save_blobs: + return BlobFile( + self.loop, blob_hash, length, self.blob_completed, self.blob_dir + ) + else: + if length and is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash)): + return BlobFile( + self.loop, blob_hash, length, self.blob_completed, self.blob_dir + ) + return BlobBuffer( + self.loop, blob_hash, length, self.blob_completed, self.blob_dir + ) + + def get_blob(self, blob_hash, length: typing.Optional[int] = None): + if blob_hash in self.blobs: + if length and self.blobs[blob_hash].length is None: + self.blobs[blob_hash].set_length(length) + else: + self.blobs[blob_hash] = self._get_blob(blob_hash, length) + return self.blobs[blob_hash] async def setup(self) -> bool: def get_files_in_blob_dir() -> typing.Set[str]: @@ -54,28 +72,22 @@ class BlobManager: blob.close() self.completed_blob_hashes.clear() - def get_blob(self, blob_hash, length: typing.Optional[int] = None): - if blob_hash in self.blobs: - if length and self.blobs[blob_hash].length is None: - self.blobs[blob_hash].set_length(length) - else: - self.blobs[blob_hash] = self.get_blob_class()(self.loop, blob_hash, length, self.blob_completed, - self.blob_dir) - return self.blobs[blob_hash] - def get_stream_descriptor(self, sd_hash): return StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_dir, self.get_blob(sd_hash)) - async def blob_completed(self, blob: AbstractBlob): + def blob_completed(self, blob: AbstractBlob): if blob.blob_hash is None: raise Exception("Blob hash is None") if not blob.length: raise Exception("Blob has a length of 0") - if isinstance(blob, BlobBuffer): # don't save blob buffers to the db / dont announce them - return - if blob.blob_hash not in self.completed_blob_hashes: - self.completed_blob_hashes.add(blob.blob_hash) - await self.storage.add_completed_blob(blob.blob_hash, blob.length) + if not blob.get_is_verified(): + raise Exception("Blob is not verified") + if isinstance(blob, BlobFile): + if blob.blob_hash not in self.completed_blob_hashes: + self.completed_blob_hashes.add(blob.blob_hash) + self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=True)) + else: + self.loop.create_task(self.storage.add_blobs((blob.blob_hash, blob.length), finished=False)) def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: """Returns of the blobhashes_to_check, which are valid""" diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 54d5c1390..fb3a6485f 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -294,7 +294,7 @@ class BlobComponent(Component): blob_dir = os.path.join(self.conf.data_dir, 'blobfiles') if not os.path.isdir(blob_dir): os.mkdir(blob_dir) - self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, data_store, self.conf.save_blobs) + self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, self.conf, data_store) return await self.blob_manager.setup() async def stop(self): @@ -487,7 +487,7 @@ class UPnPComponent(Component): while True: if now: await self._maintain_redirects() - await asyncio.sleep(360) + await asyncio.sleep(360, loop=self.component_manager.loop) async def _maintain_redirects(self): # setup the gateway if necessary diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 93830eb5b..02612991a 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -300,27 +300,28 @@ class SQLiteStorage(SQLiteMixin): # # # # # # # # # blob functions # # # # # # # # # - def add_completed_blob(self, blob_hash: str, length: int): - def _add_blob(transaction: sqlite3.Connection): - transaction.execute( + async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int], finished=False): + def _add_blobs(transaction: sqlite3.Connection): + transaction.executemany( "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", - (blob_hash, length, 0, 0, "pending", 0, 0) + [ + (blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0) + for blob_hash, length in blob_hashes_and_lengths + ] ) - transaction.execute( - "update blob set status='finished' where blob.blob_hash=?", (blob_hash, ) - ) - return self.db.run(_add_blob) + if finished: + transaction.executemany( + "update blob set status='finished' where blob.blob_hash=?", [ + (blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths + ] + ) + return await self.db.run(_add_blobs) def get_blob_status(self, blob_hash: str): return self.run_and_return_one_or_none( "select status from blob where blob_hash=?", blob_hash ) - def add_known_blob(self, blob_hash: str, length: int): - return self.db.execute( - "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", (blob_hash, length, 0, 0, "pending", 0, 0) - ) - def should_announce(self, blob_hash: str): return self.run_and_return_one_or_none( "select should_announce from blob where blob_hash=?", blob_hash diff --git a/lbrynet/stream/descriptor.py b/lbrynet/stream/descriptor.py index e770a3bcd..55f2135b8 100644 --- a/lbrynet/stream/descriptor.py +++ b/lbrynet/stream/descriptor.py @@ -109,13 +109,14 @@ class StreamDescriptor: return h.hexdigest() async def make_sd_blob(self, blob_file_obj: typing.Optional[AbstractBlob] = None, - old_sort: typing.Optional[bool] = False): + old_sort: typing.Optional[bool] = False, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None): sd_hash = self.calculate_sd_hash() if not old_sort else self.calculate_old_sort_sd_hash() if not old_sort: sd_data = self.as_json() else: sd_data = self.old_sort_json() - sd_blob = blob_file_obj or BlobFile(self.loop, sd_hash, len(sd_data), blob_directory=self.blob_dir) + sd_blob = blob_file_obj or BlobFile(self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir) if blob_file_obj: blob_file_obj.set_length(len(sd_data)) if not sd_blob.get_is_verified(): @@ -194,11 +195,12 @@ class StreamDescriptor: return h.hexdigest() @classmethod - async def create_stream(cls, loop: asyncio.BaseEventLoop, blob_dir: str, - file_path: str, key: typing.Optional[bytes] = None, - iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None, - old_sort: bool = False) -> 'StreamDescriptor': - + async def create_stream( + cls, loop: asyncio.BaseEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None, + iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None, + old_sort: bool = False, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], + None]] = None) -> 'StreamDescriptor': blobs: typing.List[BlobInfo] = [] iv_generator = iv_generator or random_iv_generator() @@ -207,7 +209,7 @@ class StreamDescriptor: for blob_bytes in file_reader(file_path): blob_num += 1 blob_info = await BlobFile.create_from_unencrypted( - loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num + loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback ) blobs.append(blob_info) blobs.append( @@ -216,7 +218,7 @@ class StreamDescriptor: loop, blob_dir, os.path.basename(file_path), binascii.hexlify(key).decode(), os.path.basename(file_path), blobs ) - sd_blob = await descriptor.make_sd_blob(old_sort=old_sort) + sd_blob = await descriptor.make_sd_blob(old_sort=old_sort, blob_completed_callback=blob_completed_callback) descriptor.sd_hash = sd_blob.blob_hash return descriptor diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index cbc3d6be1..e70052b3b 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -9,6 +9,7 @@ from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.reflector.client import StreamReflectorClient from lbrynet.extras.daemon.storage import StoredStreamClaim +from lbrynet.blob.blob_file import BlobFile if typing.TYPE_CHECKING: from lbrynet.conf import Config from lbrynet.schema.claim import Claim @@ -207,15 +208,12 @@ class ManagedStream: file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': descriptor = await StreamDescriptor.create_stream( - loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator + loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, + blob_completed_callback=blob_manager.blob_completed ) - sd_blob = blob_manager.get_blob(descriptor.sd_hash) await blob_manager.storage.store_stream( blob_manager.get_blob(descriptor.sd_hash), descriptor ) - await blob_manager.blob_completed(sd_blob) - for blob in descriptor.blobs[:-1]: - await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length)) row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0) return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), diff --git a/lbrynet/testcase.py b/lbrynet/testcase.py index f20e86bf2..80e121e5a 100644 --- a/lbrynet/testcase.py +++ b/lbrynet/testcase.py @@ -107,9 +107,11 @@ class CommandTestCase(IntegrationTestCase): server_tmp_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, server_tmp_dir) - self.server_storage = SQLiteStorage(Config(), ':memory:') + self.server_config = Config() + self.server_storage = SQLiteStorage(self.server_config, ':memory:') await self.server_storage.open() - self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage) + + self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage, self.server_config) self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP') self.server.start_server(5567, '127.0.0.1') await self.server.started_listening.wait() diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 3df876dcc..0357721dc 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -142,7 +142,7 @@ class FileCommands(CommandTestCase): os.rename(missing_blob.file_path + '__', missing_blob.file_path) self.server_blob_manager.blobs.clear() missing_blob = self.server_blob_manager.get_blob(missing_blob_hash) - await self.server_blob_manager.blob_completed(missing_blob) + self.server_blob_manager.blob_completed(missing_blob) await asyncio.wait_for(self.wait_files_to_complete(), timeout=1) async def test_paid_download(self): diff --git a/tests/unit/blob/test_blob_file.py b/tests/unit/blob/test_blob_file.py index 5933a906d..90b5d9ce9 100644 --- a/tests/unit/blob/test_blob_file.py +++ b/tests/unit/blob/test_blob_file.py @@ -15,10 +15,17 @@ class TestBlob(AsyncioTestCase): blob_bytes = b'1' * ((2 * 2 ** 20) - 1) async def asyncSetUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(self.tmp_dir)) self.loop = asyncio.get_running_loop() + self.config = Config() + self.storage = SQLiteStorage(self.config, ":memory:", self.loop) + self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage, self.config) + await self.storage.open() def _get_blob(self, blob_class=AbstractBlob, blob_directory=None): - blob = blob_class(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=blob_directory) + blob = blob_class(self.loop, self.blob_hash, len(self.blob_bytes), self.blob_manager.blob_completed, + blob_directory=blob_directory) self.assertFalse(blob.get_is_verified()) self.addCleanup(blob.close) return blob @@ -29,6 +36,7 @@ class TestBlob(AsyncioTestCase): writer.write(self.blob_bytes) await blob.verified.wait() self.assertTrue(blob.get_is_verified()) + await asyncio.sleep(0, loop=self.loop) # wait for the db save task return blob async def _test_close_writers_on_finished(self, blob_class=AbstractBlob, blob_directory=None): @@ -54,25 +62,39 @@ class TestBlob(AsyncioTestCase): other.write(self.blob_bytes) def _test_ioerror_if_length_not_set(self, blob_class=AbstractBlob, blob_directory=None): - blob = blob_class(self.loop, self.blob_hash, blob_directory=blob_directory) + blob = blob_class( + self.loop, self.blob_hash, blob_completed_callback=self.blob_manager.blob_completed, + blob_directory=blob_directory + ) self.addCleanup(blob.close) writer = blob.get_blob_writer() with self.assertRaises(IOError): writer.write(b'') async def _test_invalid_blob_bytes(self, blob_class=AbstractBlob, blob_directory=None): - blob = blob_class(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=blob_directory) + blob = blob_class( + self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, + blob_directory=blob_directory + ) self.addCleanup(blob.close) writer = blob.get_blob_writer() writer.write(self.blob_bytes[:-4] + b'fake') with self.assertRaises(InvalidBlobHashError): await writer.finished + async def test_add_blob_buffer_to_db(self): + blob = await self._test_create_blob(BlobBuffer) + db_status = await self.storage.get_blob_status(blob.blob_hash) + self.assertEqual(db_status, 'pending') + + async def test_add_blob_file_to_db(self): + blob = await self._test_create_blob(BlobFile, self.tmp_dir) + db_status = await self.storage.get_blob_status(blob.blob_hash) + self.assertEqual(db_status, 'finished') + async def test_invalid_blob_bytes(self): - tmp_dir = tempfile.mkdtemp() - self.addCleanup(lambda: shutil.rmtree(tmp_dir)) await self._test_invalid_blob_bytes(BlobBuffer) - await self._test_invalid_blob_bytes(BlobFile, tmp_dir) + await self._test_invalid_blob_bytes(BlobFile, self.tmp_dir) def test_ioerror_if_length_not_set(self): tmp_dir = tempfile.mkdtemp() @@ -113,6 +135,7 @@ class TestBlob(AsyncioTestCase): async def test_delete(self): blob_buffer = await self._test_create_blob(BlobBuffer) + self.assertIsInstance(blob_buffer, BlobBuffer) self.assertIsNotNone(blob_buffer._verified_bytes) self.assertTrue(blob_buffer.get_is_verified()) blob_buffer.delete() @@ -123,6 +146,7 @@ class TestBlob(AsyncioTestCase): self.addCleanup(lambda: shutil.rmtree(tmp_dir)) blob_file = await self._test_create_blob(BlobFile, tmp_dir) + self.assertIsInstance(blob_file, BlobFile) self.assertTrue(os.path.isfile(blob_file.file_path)) self.assertTrue(blob_file.get_is_verified()) blob_file.delete() @@ -132,17 +156,26 @@ class TestBlob(AsyncioTestCase): async def test_delete_corrupt(self): tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - blob = BlobFile(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=tmp_dir) + blob = BlobFile( + self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, + blob_directory=tmp_dir + ) writer = blob.get_blob_writer() writer.write(self.blob_bytes) await blob.verified.wait() blob.close() - blob = BlobFile(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=tmp_dir) + blob = BlobFile( + self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, + blob_directory=tmp_dir + ) self.assertTrue(blob.get_is_verified()) with open(blob.file_path, 'wb+') as f: f.write(b'\x00') - blob = BlobFile(self.loop, self.blob_hash, len(self.blob_bytes), blob_directory=tmp_dir) + blob = BlobFile( + self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, + blob_directory=tmp_dir + ) self.assertFalse(blob.get_is_verified()) self.assertFalse(os.path.isfile(blob.file_path)) diff --git a/tests/unit/blob/test_blob_manager.py b/tests/unit/blob/test_blob_manager.py index 1d1c6752b..1f16b8480 100644 --- a/tests/unit/blob/test_blob_manager.py +++ b/tests/unit/blob/test_blob_manager.py @@ -9,46 +9,50 @@ from lbrynet.blob.blob_manager import BlobManager class TestBlobManager(AsyncioTestCase): - async def test_sync_blob_manager_on_startup(self): - loop = asyncio.get_event_loop() + async def setup_blob_manager(self, save_blobs=True): tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) + self.config = Config(save_blobs=save_blobs) + self.storage = SQLiteStorage(self.config, os.path.join(tmp_dir, "lbrynet.sqlite")) + self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config) + await self.storage.open() - storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite")) - blob_manager = BlobManager(loop, tmp_dir, storage) + async def test_sync_blob_file_manager_on_startup(self): + await self.setup_blob_manager(save_blobs=True) # add a blob file blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" blob_bytes = b'1' * ((2 * 2 ** 20) - 1) - with open(os.path.join(blob_manager.blob_dir, blob_hash), 'wb') as f: + with open(os.path.join(self.blob_manager.blob_dir, blob_hash), 'wb') as f: f.write(blob_bytes) # it should not have been added automatically on startup - await storage.open() - await blob_manager.setup() - self.assertSetEqual(blob_manager.completed_blob_hashes, set()) + + await self.blob_manager.setup() + self.assertSetEqual(self.blob_manager.completed_blob_hashes, set()) # make sure we can add the blob - await blob_manager.blob_completed(blob_manager.get_blob(blob_hash, len(blob_bytes))) - self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash}) + self.blob_manager.blob_completed(self.blob_manager.get_blob(blob_hash, len(blob_bytes))) + await self.blob_manager.storage.add_blobs((blob_hash, len(blob_bytes)), finished=True) + self.assertSetEqual(self.blob_manager.completed_blob_hashes, {blob_hash}) # stop the blob manager and restart it, make sure the blob is there - blob_manager.stop() - self.assertSetEqual(blob_manager.completed_blob_hashes, set()) - await blob_manager.setup() - self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash}) + self.blob_manager.stop() + self.assertSetEqual(self.blob_manager.completed_blob_hashes, set()) + await self.blob_manager.setup() + self.assertSetEqual(self.blob_manager.completed_blob_hashes, {blob_hash}) # test that the blob is removed upon the next startup after the file being manually deleted - blob_manager.stop() + self.blob_manager.stop() # manually delete the blob file and restart the blob manager - os.remove(os.path.join(blob_manager.blob_dir, blob_hash)) - await blob_manager.setup() - self.assertSetEqual(blob_manager.completed_blob_hashes, set()) + os.remove(os.path.join(self.blob_manager.blob_dir, blob_hash)) + await self.blob_manager.setup() + self.assertSetEqual(self.blob_manager.completed_blob_hashes, set()) # check that the deleted blob was updated in the database self.assertEqual( 'pending', ( - await storage.run_and_return_one_or_none('select status from blob where blob_hash=?', blob_hash) + await self.storage.run_and_return_one_or_none('select status from blob where blob_hash=?', blob_hash) ) ) diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index 8577b0509..58d124951 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -35,13 +35,13 @@ class BlobExchangeTestBase(AsyncioTestCase): self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir, reflector_servers=[]) self.server_storage = SQLiteStorage(self.server_config, os.path.join(self.server_dir, "lbrynet.sqlite")) - self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage) + self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config) self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP') self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir, reflector_servers=[]) self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite")) - self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage) + self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config) self.client_peer_manager = PeerManager(self.loop) self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) @@ -64,6 +64,7 @@ class TestBlobExchange(BlobExchangeTestBase): await server_blob.verified.wait() self.assertTrue(os.path.isfile(server_blob.file_path)) self.assertEqual(server_blob.get_is_verified(), True) + self.assertTrue(writer.closed()) async def _test_transfer_blob(self, blob_hash: str): client_blob = self.client_blob_manager.get_blob(blob_hash) @@ -76,7 +77,7 @@ class TestBlobExchange(BlobExchangeTestBase): await client_blob.verified.wait() self.assertEqual(client_blob.get_is_verified(), True) self.assertTrue(downloaded) - self.addCleanup(client_blob.close) + client_blob.close() async def test_transfer_sd_blob(self): sd_hash = "3e2706157a59aaa47ef52bc264fce488078b4026c0b9bab649a8f2fe1ecc5e5cad7182a2bb7722460f856831a1ac0f02" @@ -96,9 +97,11 @@ class TestBlobExchange(BlobExchangeTestBase): second_client_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, second_client_dir) - - second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite")) - second_client_blob_manager = BlobManager(self.loop, second_client_dir, second_client_storage) + second_client_conf = Config() + second_client_storage = SQLiteStorage(second_client_conf, os.path.join(second_client_dir, "lbrynet.sqlite")) + second_client_blob_manager = BlobManager( + self.loop, second_client_dir, second_client_storage, second_client_conf + ) server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) await second_client_storage.open() @@ -128,9 +131,12 @@ class TestBlobExchange(BlobExchangeTestBase): second_client_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, second_client_dir) + second_client_conf = Config() - second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite")) - second_client_blob_manager = BlobManager(self.loop, second_client_dir, second_client_storage) + second_client_storage = SQLiteStorage(second_client_conf, os.path.join(second_client_dir, "lbrynet.sqlite")) + second_client_blob_manager = BlobManager( + self.loop, second_client_dir, second_client_storage, second_client_conf + ) server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) await second_client_storage.open() diff --git a/tests/unit/database/test_SQLiteStorage.py b/tests/unit/database/test_SQLiteStorage.py index a674468c3..3a7564faa 100644 --- a/tests/unit/database/test_SQLiteStorage.py +++ b/tests/unit/database/test_SQLiteStorage.py @@ -68,17 +68,18 @@ fake_claim_info = { class StorageTest(AsyncioTestCase): async def asyncSetUp(self): - self.storage = SQLiteStorage(Config(), ':memory:') + self.conf = Config() + self.storage = SQLiteStorage(self.conf, ':memory:') self.blob_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.blob_dir) - self.blob_manager = BlobManager(asyncio.get_event_loop(), self.blob_dir, self.storage) + self.blob_manager = BlobManager(asyncio.get_event_loop(), self.blob_dir, self.storage, self.conf) await self.storage.open() async def asyncTearDown(self): await self.storage.close() async def store_fake_blob(self, blob_hash, length=100): - await self.storage.add_completed_blob(blob_hash, length) + await self.storage.add_blobs((blob_hash, length), finished=True) async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"): blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())] diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 654484af7..85dcd0946 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -78,8 +78,7 @@ class TestBlobAnnouncer(AsyncioTestCase): blob2 = binascii.hexlify(b'2' * 48).decode() async with self._test_network_context(): - await self.storage.add_completed_blob(blob1, 1024) - await self.storage.add_completed_blob(blob2, 1024) + await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True) await self.storage.db.execute( "update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)", (blob1, blob2) diff --git a/tests/unit/stream/test_managed_stream.py b/tests/unit/stream/test_managed_stream.py index 2039f1cbd..7b8d92ff7 100644 --- a/tests/unit/stream/test_managed_stream.py +++ b/tests/unit/stream/test_managed_stream.py @@ -36,8 +36,9 @@ class TestManagedStream(BlobExchangeTestBase): async def setup_stream(self, blob_count: int = 10): await self.create_stream(blob_count) - self.stream = ManagedStream(self.loop, self.client_config, self.client_blob_manager, self.sd_hash, - self.client_dir) + self.stream = ManagedStream( + self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir + ) async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None): await self.setup_stream(blob_count) diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 6626a4a53..b5cdf2960 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -18,16 +18,18 @@ class TestStreamAssembler(AsyncioTestCase): tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite")) + self.conf = Config() + self.storage = SQLiteStorage(self.conf, os.path.join(tmp_dir, "lbrynet.sqlite")) await self.storage.open() - self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage) + self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.conf) self.stream_manager = StreamManager(self.loop, Config(), self.blob_manager, None, self.storage, None) server_tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(server_tmp_dir)) - self.server_storage = SQLiteStorage(Config(), os.path.join(server_tmp_dir, "lbrynet.sqlite")) + self.server_conf = Config() + self.server_storage = SQLiteStorage(self.server_conf, os.path.join(server_tmp_dir, "lbrynet.sqlite")) await self.server_storage.open() - self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage) + self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage, self.server_conf) download_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(download_dir)) diff --git a/tests/unit/stream/test_stream_descriptor.py b/tests/unit/stream/test_stream_descriptor.py index 7cadf3ffa..479ab48ac 100644 --- a/tests/unit/stream/test_stream_descriptor.py +++ b/tests/unit/stream/test_stream_descriptor.py @@ -20,9 +20,10 @@ class TestStreamDescriptor(AsyncioTestCase): self.cleartext = os.urandom(20000000) self.tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(self.tmp_dir)) - self.storage = SQLiteStorage(Config(), ":memory:") + self.conf = Config() + self.storage = SQLiteStorage(self.conf, ":memory:") await self.storage.open() - self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage) + self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage, self.conf) self.file_path = os.path.join(self.tmp_dir, "test_file") with open(self.file_path, 'wb') as f: @@ -83,9 +84,10 @@ class TestRecoverOldStreamDescriptors(AsyncioTestCase): loop = asyncio.get_event_loop() tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - storage = SQLiteStorage(Config(), ":memory:") + self.conf = Config() + storage = SQLiteStorage(self.conf, ":memory:") await storage.open() - blob_manager = BlobManager(loop, tmp_dir, storage) + blob_manager = BlobManager(loop, tmp_dir, storage, self.conf) sd_bytes = b'{"stream_name": "4f62616d6120446f6e6b65792d322e73746c", "blobs": [{"length": 1153488, "blob_num' \ b'": 0, "blob_hash": "9fa32a249ce3f2d4e46b78599800f368b72f2a7f22b81df443c7f6bdbef496bd61b4c0079c7' \