forked from LBRYCommunity/lbry-sdk
sync blob files in database with those in directory on startup
This commit is contained in:
parent
dba4a09d38
commit
39737c790f
5 changed files with 99 additions and 13 deletions
|
@ -30,13 +30,21 @@ class BlobFileManager:
|
||||||
self.blobs: typing.Dict[str, BlobFile] = {}
|
self.blobs: typing.Dict[str, BlobFile] = {}
|
||||||
|
|
||||||
async def setup(self) -> bool:
|
async def setup(self) -> bool:
|
||||||
def initialize_blob_hashes():
|
def get_files_in_blob_dir() -> typing.Set[str]:
|
||||||
self.completed_blob_hashes.update(
|
return {
|
||||||
item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name)
|
item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name)
|
||||||
)
|
}
|
||||||
await self.loop.run_in_executor(None, initialize_blob_hashes)
|
|
||||||
|
in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir)
|
||||||
|
self.completed_blob_hashes.update(await self.storage.sync_missing_blobs(in_blobfiles_dir))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
while self.blobs:
|
||||||
|
_, blob = self.blobs.popitem()
|
||||||
|
blob.close()
|
||||||
|
self.completed_blob_hashes.clear()
|
||||||
|
|
||||||
def get_blob(self, blob_hash, length: typing.Optional[int] = None):
|
def get_blob(self, blob_hash, length: typing.Optional[int] = None):
|
||||||
if blob_hash in self.blobs:
|
if blob_hash in self.blobs:
|
||||||
if length and self.blobs[blob_hash].length is None:
|
if length and self.blobs[blob_hash].length is None:
|
||||||
|
@ -55,7 +63,7 @@ class BlobFileManager:
|
||||||
raise Exception("Blob has a length of 0")
|
raise Exception("Blob has a length of 0")
|
||||||
if blob.blob_hash not in self.completed_blob_hashes:
|
if blob.blob_hash not in self.completed_blob_hashes:
|
||||||
self.completed_blob_hashes.add(blob.blob_hash)
|
self.completed_blob_hashes.add(blob.blob_hash)
|
||||||
await self.storage.add_completed_blob(blob.blob_hash)
|
await self.storage.add_completed_blob(blob.blob_hash, blob.length)
|
||||||
|
|
||||||
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
||||||
"""Returns of the blobhashes_to_check, which are valid"""
|
"""Returns of the blobhashes_to_check, which are valid"""
|
||||||
|
|
|
@ -321,9 +321,7 @@ class BlobComponent(Component):
|
||||||
return await self.blob_manager.setup()
|
return await self.blob_manager.setup()
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
while self.blob_manager and self.blob_manager.blobs:
|
self.blob_manager.stop()
|
||||||
_, blob = self.blob_manager.blobs.popitem()
|
|
||||||
blob.close()
|
|
||||||
|
|
||||||
async def get_status(self):
|
async def get_status(self):
|
||||||
count = 0
|
count = 0
|
||||||
|
|
|
@ -255,9 +255,16 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
|
|
||||||
# # # # # # # # # blob functions # # # # # # # # #
|
# # # # # # # # # blob functions # # # # # # # # #
|
||||||
|
|
||||||
def add_completed_blob(self, blob_hash: str):
|
def add_completed_blob(self, blob_hash: str, length: int):
|
||||||
log.debug("Adding a completed blob. blob_hash=%s", blob_hash)
|
def _add_blob(transaction: sqlite3.Connection):
|
||||||
return self.db.execute("update blob set status='finished' where blob.blob_hash=?", (blob_hash, ))
|
transaction.execute(
|
||||||
|
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
|
||||||
|
(blob_hash, length, 0, 0, "pending", 0, 0)
|
||||||
|
)
|
||||||
|
transaction.execute(
|
||||||
|
"update blob set status='finished' where blob.blob_hash=?", (blob_hash, )
|
||||||
|
)
|
||||||
|
return self.db.run(_add_blob)
|
||||||
|
|
||||||
def get_blob_status(self, blob_hash: str):
|
def get_blob_status(self, blob_hash: str):
|
||||||
return self.run_and_return_one_or_none(
|
return self.run_and_return_one_or_none(
|
||||||
|
@ -351,6 +358,26 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def get_all_blob_hashes(self):
|
def get_all_blob_hashes(self):
|
||||||
return self.run_and_return_list("select blob_hash from blob")
|
return self.run_and_return_list("select blob_hash from blob")
|
||||||
|
|
||||||
|
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
|
||||||
|
def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]:
|
||||||
|
to_update = [
|
||||||
|
(blob_hash, )
|
||||||
|
for (blob_hash, ) in transaction.execute("select blob_hash from blob where status='finished'")
|
||||||
|
if blob_hash not in blob_files
|
||||||
|
]
|
||||||
|
transaction.executemany(
|
||||||
|
"update blob set status='pending' where blob_hash=?",
|
||||||
|
to_update
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
blob_hash
|
||||||
|
for blob_hash, in _batched_select(
|
||||||
|
transaction, "select blob_hash from blob where status='finished' and blob_hash in {}",
|
||||||
|
list(blob_files)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return self.db.run(_sync_blobs)
|
||||||
|
|
||||||
# # # # # # # # # stream functions # # # # # # # # #
|
# # # # # # # # # stream functions # # # # # # # # #
|
||||||
|
|
||||||
async def stream_exists(self, sd_hash: str) -> bool:
|
async def stream_exists(self, sd_hash: str) -> bool:
|
||||||
|
|
54
tests/unit/blob/test_blob_manager.py
Normal file
54
tests/unit/blob/test_blob_manager.py
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
import asyncio
|
||||||
|
import tempfile
|
||||||
|
import shutil
|
||||||
|
import os
|
||||||
|
from torba.testcase import AsyncioTestCase
|
||||||
|
from lbrynet.conf import Config
|
||||||
|
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||||
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
|
|
||||||
|
|
||||||
|
class TestBlobManager(AsyncioTestCase):
|
||||||
|
async def test_sync_blob_manager_on_startup(self):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
tmp_dir = tempfile.mkdtemp()
|
||||||
|
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||||
|
|
||||||
|
storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||||
|
blob_manager = BlobFileManager(loop, tmp_dir, storage)
|
||||||
|
|
||||||
|
# add a blob file
|
||||||
|
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"
|
||||||
|
blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
|
||||||
|
with open(os.path.join(blob_manager.blob_dir, blob_hash), 'wb') as f:
|
||||||
|
f.write(blob_bytes)
|
||||||
|
|
||||||
|
# it should not have been added automatically on startup
|
||||||
|
await storage.open()
|
||||||
|
await blob_manager.setup()
|
||||||
|
self.assertSetEqual(blob_manager.completed_blob_hashes, set())
|
||||||
|
|
||||||
|
# make sure we can add the blob
|
||||||
|
await blob_manager.blob_completed(blob_manager.get_blob(blob_hash, len(blob_bytes)))
|
||||||
|
self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash})
|
||||||
|
|
||||||
|
# stop the blob manager and restart it, make sure the blob is there
|
||||||
|
blob_manager.stop()
|
||||||
|
self.assertSetEqual(blob_manager.completed_blob_hashes, set())
|
||||||
|
await blob_manager.setup()
|
||||||
|
self.assertSetEqual(blob_manager.completed_blob_hashes, {blob_hash})
|
||||||
|
|
||||||
|
# test that the blob is removed upon the next startup after the file being manually deleted
|
||||||
|
blob_manager.stop()
|
||||||
|
|
||||||
|
# manually delete the blob file and restart the blob manager
|
||||||
|
os.remove(os.path.join(blob_manager.blob_dir, blob_hash))
|
||||||
|
await blob_manager.setup()
|
||||||
|
self.assertSetEqual(blob_manager.completed_blob_hashes, set())
|
||||||
|
|
||||||
|
# check that the deleted blob was updated in the database
|
||||||
|
self.assertEqual(
|
||||||
|
'pending', (
|
||||||
|
await storage.run_and_return_one_or_none('select status from blob where blob_hash=?', blob_hash)
|
||||||
|
)
|
||||||
|
)
|
|
@ -78,8 +78,7 @@ class StorageTest(AsyncioTestCase):
|
||||||
await self.storage.close()
|
await self.storage.close()
|
||||||
|
|
||||||
async def store_fake_blob(self, blob_hash, length=100):
|
async def store_fake_blob(self, blob_hash, length=100):
|
||||||
await self.storage.add_known_blob(blob_hash, length)
|
await self.storage.add_completed_blob(blob_hash, length)
|
||||||
await self.storage.add_completed_blob(blob_hash)
|
|
||||||
|
|
||||||
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
|
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
|
||||||
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())]
|
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())]
|
||||||
|
|
Loading…
Reference in a new issue