From 39737c790ff85d55cfb7ade71dde01b83f166a9f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Feb 2019 12:36:18 -0500 Subject: [PATCH] sync blob files in database with those in directory on startup --- lbrynet/blob/blob_manager.py | 18 +++++--- lbrynet/extras/daemon/Components.py | 4 +- lbrynet/extras/daemon/storage.py | 33 ++++++++++++-- tests/unit/blob/test_blob_manager.py | 54 +++++++++++++++++++++++ tests/unit/database/test_SQLiteStorage.py | 3 +- 5 files changed, 99 insertions(+), 13 deletions(-) create mode 100644 tests/unit/blob/test_blob_manager.py diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 84d0daf59..84379b67f 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -30,13 +30,21 @@ class BlobFileManager: self.blobs: typing.Dict[str, BlobFile] = {} async def setup(self) -> bool: - def initialize_blob_hashes(): - self.completed_blob_hashes.update( + def get_files_in_blob_dir() -> typing.Set[str]: + return { item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name) - ) - await self.loop.run_in_executor(None, initialize_blob_hashes) + } + + in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir) + self.completed_blob_hashes.update(await self.storage.sync_missing_blobs(in_blobfiles_dir)) return True + def stop(self): + while self.blobs: + _, blob = self.blobs.popitem() + blob.close() + self.completed_blob_hashes.clear() + def get_blob(self, blob_hash, length: typing.Optional[int] = None): if blob_hash in self.blobs: if length and self.blobs[blob_hash].length is None: @@ -55,7 +63,7 @@ class BlobFileManager: raise Exception("Blob has a length of 0") if blob.blob_hash not in self.completed_blob_hashes: self.completed_blob_hashes.add(blob.blob_hash) - await self.storage.add_completed_blob(blob.blob_hash) + await self.storage.add_completed_blob(blob.blob_hash, blob.length) def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: """Returns of the blobhashes_to_check, which are valid""" diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 530974da8..c7c1e3af1 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -321,9 +321,7 @@ class BlobComponent(Component): return await self.blob_manager.setup() async def stop(self): - while self.blob_manager and self.blob_manager.blobs: - _, blob = self.blob_manager.blobs.popitem() - blob.close() + self.blob_manager.stop() async def get_status(self): count = 0 diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 68e93145c..8fed89d7f 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -255,9 +255,16 @@ class SQLiteStorage(SQLiteMixin): # # # # # # # # # blob functions # # # # # # # # # - def add_completed_blob(self, blob_hash: str): - log.debug("Adding a completed blob. blob_hash=%s", blob_hash) - return self.db.execute("update blob set status='finished' where blob.blob_hash=?", (blob_hash, )) + def add_completed_blob(self, blob_hash: str, length: int): + def _add_blob(transaction: sqlite3.Connection): + transaction.execute( + "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", + (blob_hash, length, 0, 0, "pending", 0, 0) + ) + transaction.execute( + "update blob set status='finished' where blob.blob_hash=?", (blob_hash, ) + ) + return self.db.run(_add_blob) def get_blob_status(self, blob_hash: str): return self.run_and_return_one_or_none( @@ -351,6 +358,26 @@ class SQLiteStorage(SQLiteMixin): def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") + def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]: + def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]: + to_update = [ + (blob_hash, ) + for (blob_hash, ) in transaction.execute("select blob_hash from blob where status='finished'") + if blob_hash not in blob_files + ] + transaction.executemany( + "update blob set status='pending' where blob_hash=?", + to_update + ) + return { + blob_hash + for blob_hash, in _batched_select( + transaction, "select blob_hash from blob where status='finished' and blob_hash in {}", + list(blob_files) + ) + } + return self.db.run(_sync_blobs) + # # # # # # # # # stream functions # # # # # # # # # async def stream_exists(self, sd_hash: str) -> bool: diff --git a/tests/unit/blob/test_blob_manager.py b/tests/unit/blob/test_blob_manager.py new file mode 100644 index 000000000..a181c3874 --- /dev/null +++ b/tests/unit/blob/test_blob_manager.py @@ -0,0 +1,54 @@ +import asyncio +import tempfile +import shutil +import os +from torba.testcase import AsyncioTestCase +from lbrynet.conf import Config +from lbrynet.extras.daemon.storage import SQLiteStorage +from lbrynet.blob.blob_manager import BlobFileManager + + +class TestBlobManager(AsyncioTestCase): + async def test_sync_blob_manager_on_startup(self): + loop = asyncio.get_event_loop() + tmp_dir = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(tmp_dir)) + + storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite")) + blob_manager = BlobFileManager(loop, tmp_dir, storage) + + # add a blob file + blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" + blob_bytes = b'1' * ((2 * 2 ** 20) - 1) + with open(os.path.join(blob_manager.blob_dir, blob_hash), 'wb') as f: + f.write(blob_bytes) + + # it should not have been added automatically on startup + await storage.open() + await blob_manager.setup() + self.assertSetEqual(blob_manager.completed_blob_hashes, set()) + + # make sure we can add the blob + await blob_manager.blob_completed(blob_manager.get_blob(blob_hash, len(blob_bytes))) + self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash}) + + # stop the blob manager and restart it, make sure the blob is there + blob_manager.stop() + self.assertSetEqual(blob_manager.completed_blob_hashes, set()) + await blob_manager.setup() + self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash}) + + # test that the blob is removed upon the next startup after the file being manually deleted + blob_manager.stop() + + # manually delete the blob file and restart the blob manager + os.remove(os.path.join(blob_manager.blob_dir, blob_hash)) + await blob_manager.setup() + self.assertSetEqual(blob_manager.completed_blob_hashes, set()) + + # check that the deleted blob was updated in the database + self.assertEqual( + 'pending', ( + await storage.run_and_return_one_or_none('select status from blob where blob_hash=?', blob_hash) + ) + ) diff --git a/tests/unit/database/test_SQLiteStorage.py b/tests/unit/database/test_SQLiteStorage.py index e14b3fa6f..abd7882ae 100644 --- a/tests/unit/database/test_SQLiteStorage.py +++ b/tests/unit/database/test_SQLiteStorage.py @@ -78,8 +78,7 @@ class StorageTest(AsyncioTestCase): await self.storage.close() async def store_fake_blob(self, blob_hash, length=100): - await self.storage.add_known_blob(blob_hash, length) - await self.storage.add_completed_blob(blob_hash) + await self.storage.add_completed_blob(blob_hash, length) async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"): blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())]