Merge pull request #3585 from lbryio/fix_blob_db_queries
Fixes bugs on disk space management and stream recovery
This commit is contained in:
parent
2a698932da
commit
cf73e4599f
10 changed files with 62 additions and 19 deletions
|
@ -201,7 +201,7 @@ class AbstractBlob:
|
||||||
writer = blob.get_blob_writer()
|
writer = blob.get_blob_writer()
|
||||||
writer.write(blob_bytes)
|
writer.write(blob_bytes)
|
||||||
await blob.verified.wait()
|
await blob.verified.wait()
|
||||||
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash, added_on, is_mine)
|
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), added_on, blob_hash, is_mine)
|
||||||
|
|
||||||
def save_verified_blob(self, verified_bytes: bytes):
|
def save_verified_blob(self, verified_bytes: bytes):
|
||||||
if self.verified.is_set():
|
if self.verified.is_set():
|
||||||
|
|
|
@ -12,8 +12,8 @@ class BlobInfo:
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, blob_num: int, length: int, iv: str,
|
self, blob_num: int, length: int, iv: str, added_on,
|
||||||
blob_hash: typing.Optional[str] = None, added_on=0, is_mine=False):
|
blob_hash: typing.Optional[str] = None, is_mine=False):
|
||||||
self.blob_hash = blob_hash
|
self.blob_hash = blob_hash
|
||||||
self.blob_num = blob_num
|
self.blob_num = blob_num
|
||||||
self.length = length
|
self.length = length
|
||||||
|
|
|
@ -83,6 +83,8 @@ class BlobManager:
|
||||||
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
|
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
|
||||||
if to_add:
|
if to_add:
|
||||||
self.completed_blob_hashes.update(to_add)
|
self.completed_blob_hashes.update(to_add)
|
||||||
|
# check blobs that aren't set as finished but were seen on disk
|
||||||
|
await self.ensure_completed_blobs_status(in_blobfiles_dir - to_add)
|
||||||
if self.config.track_bandwidth:
|
if self.config.track_bandwidth:
|
||||||
self.connection_manager.start()
|
self.connection_manager.start()
|
||||||
return True
|
return True
|
||||||
|
@ -113,9 +115,18 @@ class BlobManager:
|
||||||
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False)
|
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False)
|
||||||
)
|
)
|
||||||
|
|
||||||
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
async def ensure_completed_blobs_status(self, blob_hashes: typing.Iterable[str]):
|
||||||
"""Returns of the blobhashes_to_check, which are valid"""
|
"""Ensures that completed blobs from a given list of blob hashes are set as 'finished' in the database."""
|
||||||
return [blob_hash for blob_hash in blob_hashes if self.is_blob_verified(blob_hash)]
|
to_add = []
|
||||||
|
for blob_hash in blob_hashes:
|
||||||
|
if not self.is_blob_verified(blob_hash):
|
||||||
|
continue
|
||||||
|
blob = self.get_blob(blob_hash)
|
||||||
|
to_add.append((blob.blob_hash, blob.length, blob.added_on, blob.is_mine))
|
||||||
|
if len(to_add) > 500:
|
||||||
|
await self.storage.add_blobs(*to_add, finished=True)
|
||||||
|
to_add.clear()
|
||||||
|
return await self.storage.add_blobs(*to_add, finished=True)
|
||||||
|
|
||||||
def delete_blob(self, blob_hash: str):
|
def delete_blob(self, blob_hash: str):
|
||||||
if not is_valid_blobhash(blob_hash):
|
if not is_valid_blobhash(blob_hash):
|
||||||
|
|
|
@ -20,7 +20,7 @@ def do_migration(conf):
|
||||||
"left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
|
"left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
|
||||||
blobs_by_stream = {}
|
blobs_by_stream = {}
|
||||||
for stream_hash, position, iv, blob_hash, blob_length in blobs:
|
for stream_hash, position, iv, blob_hash, blob_length in blobs:
|
||||||
blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, blob_hash))
|
blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, 0, blob_hash))
|
||||||
|
|
||||||
for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
|
for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
|
||||||
sd = StreamDescriptor(None, blob_dir, stream_name, stream_key, suggested_filename,
|
sd = StreamDescriptor(None, blob_dir, stream_name, stream_key, suggested_filename,
|
||||||
|
|
|
@ -449,7 +449,8 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
return await self.db.execute_fetchall(
|
return await self.db.execute_fetchall(
|
||||||
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
||||||
"from blob left join stream_blob using (blob_hash) "
|
"from blob left join stream_blob using (blob_hash) "
|
||||||
"where stream_blob.stream_hash is null and blob.is_mine=? order by blob.added_on asc",
|
"where stream_blob.stream_hash is null and blob.is_mine=? "
|
||||||
|
"order by blob.blob_length desc, blob.added_on asc",
|
||||||
(is_mine,)
|
(is_mine,)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -479,7 +480,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
coalesce(sum(case when
|
coalesce(sum(case when
|
||||||
is_mine=1
|
is_mine=1
|
||||||
then blob_length else 0 end), 0) as private_storage
|
then blob_length else 0 end), 0) as private_storage
|
||||||
from blob left join stream_blob using (blob_hash)
|
from blob left join stream_blob using (blob_hash) where blob_hash not in (select sd_hash from stream)
|
||||||
""")
|
""")
|
||||||
return {
|
return {
|
||||||
'network_storage': network_size,
|
'network_storage': network_size,
|
||||||
|
@ -531,7 +532,8 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def _get_blobs_for_stream(transaction):
|
def _get_blobs_for_stream(transaction):
|
||||||
crypt_blob_infos = []
|
crypt_blob_infos = []
|
||||||
stream_blobs = transaction.execute(
|
stream_blobs = transaction.execute(
|
||||||
"select blob_hash, position, iv from stream_blob where stream_hash=? "
|
"select s.blob_hash, s.position, s.iv, b.added_on "
|
||||||
|
"from stream_blob s left outer join blob b on b.blob_hash=s.blob_hash where stream_hash=? "
|
||||||
"order by position asc", (stream_hash, )
|
"order by position asc", (stream_hash, )
|
||||||
).fetchall()
|
).fetchall()
|
||||||
if only_completed:
|
if only_completed:
|
||||||
|
@ -551,9 +553,10 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
for blob_hash, length in lengths:
|
for blob_hash, length in lengths:
|
||||||
blob_length_dict[blob_hash] = length
|
blob_length_dict[blob_hash] = length
|
||||||
|
|
||||||
for blob_hash, position, iv in stream_blobs:
|
current_time = time.time()
|
||||||
|
for blob_hash, position, iv, added_on in stream_blobs:
|
||||||
blob_length = blob_length_dict.get(blob_hash, 0)
|
blob_length = blob_length_dict.get(blob_hash, 0)
|
||||||
crypt_blob_infos.append(BlobInfo(position, blob_length, iv, blob_hash))
|
crypt_blob_infos.append(BlobInfo(position, blob_length, iv, added_on or current_time, blob_hash))
|
||||||
if not blob_hash:
|
if not blob_hash:
|
||||||
break
|
break
|
||||||
return crypt_blob_infos
|
return crypt_blob_infos
|
||||||
|
|
|
@ -194,12 +194,13 @@ class StreamDescriptor:
|
||||||
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
|
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
|
||||||
if any(i != blob_info['blob_num'] for i, blob_info in enumerate(decoded['blobs'])):
|
if any(i != blob_info['blob_num'] for i, blob_info in enumerate(decoded['blobs'])):
|
||||||
raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs")
|
raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs")
|
||||||
|
added_on = time.time()
|
||||||
descriptor = cls(
|
descriptor = cls(
|
||||||
loop, blob_dir,
|
loop, blob_dir,
|
||||||
binascii.unhexlify(decoded['stream_name']).decode(),
|
binascii.unhexlify(decoded['stream_name']).decode(),
|
||||||
decoded['key'],
|
decoded['key'],
|
||||||
binascii.unhexlify(decoded['suggested_file_name']).decode(),
|
binascii.unhexlify(decoded['suggested_file_name']).decode(),
|
||||||
[BlobInfo(info['blob_num'], info['length'], info['iv'], info.get('blob_hash'))
|
[BlobInfo(info['blob_num'], info['length'], info['iv'], added_on, info.get('blob_hash'))
|
||||||
for info in decoded['blobs']],
|
for info in decoded['blobs']],
|
||||||
decoded['stream_hash'],
|
decoded['stream_hash'],
|
||||||
blob.blob_hash
|
blob.blob_hash
|
||||||
|
@ -266,7 +267,7 @@ class StreamDescriptor:
|
||||||
blobs.append(blob_info)
|
blobs.append(blob_info)
|
||||||
blobs.append(
|
blobs.append(
|
||||||
# add the stream terminator
|
# add the stream terminator
|
||||||
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), None, added_on, True)
|
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), added_on, None, True)
|
||||||
)
|
)
|
||||||
file_name = os.path.basename(file_path)
|
file_name = os.path.basename(file_path)
|
||||||
suggested_file_name = sanitize_file_name(file_name)
|
suggested_file_name = sanitize_file_name(file_name)
|
||||||
|
|
|
@ -70,6 +70,7 @@ class StreamManager(SourceManager):
|
||||||
|
|
||||||
async def recover_streams(self, file_infos: typing.List[typing.Dict]):
|
async def recover_streams(self, file_infos: typing.List[typing.Dict]):
|
||||||
to_restore = []
|
to_restore = []
|
||||||
|
to_check = []
|
||||||
|
|
||||||
async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str,
|
async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str,
|
||||||
suggested_file_name: str, key: str,
|
suggested_file_name: str, key: str,
|
||||||
|
@ -82,6 +83,7 @@ class StreamManager(SourceManager):
|
||||||
if not descriptor:
|
if not descriptor:
|
||||||
return
|
return
|
||||||
to_restore.append((descriptor, sd_blob, content_fee))
|
to_restore.append((descriptor, sd_blob, content_fee))
|
||||||
|
to_check.extend([sd_blob.blob_hash] + [blob.blob_hash for blob in descriptor.blobs[:-1]])
|
||||||
|
|
||||||
await asyncio.gather(*[
|
await asyncio.gather(*[
|
||||||
recover_stream(
|
recover_stream(
|
||||||
|
@ -93,6 +95,8 @@ class StreamManager(SourceManager):
|
||||||
|
|
||||||
if to_restore:
|
if to_restore:
|
||||||
await self.storage.recover_streams(to_restore, self.config.download_dir)
|
await self.storage.recover_streams(to_restore, self.config.download_dir)
|
||||||
|
if to_check:
|
||||||
|
await self.blob_manager.ensure_completed_blobs_status(to_check)
|
||||||
|
|
||||||
# if self.blob_manager._save_blobs:
|
# if self.blob_manager._save_blobs:
|
||||||
# log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))
|
# log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))
|
||||||
|
|
|
@ -573,6 +573,11 @@ class DiskSpaceManagement(CommandTestCase):
|
||||||
self.assertTrue(blobs2.issubset(blobs))
|
self.assertTrue(blobs2.issubset(blobs))
|
||||||
self.assertFalse(blobs3.issubset(blobs))
|
self.assertFalse(blobs3.issubset(blobs))
|
||||||
self.assertTrue(blobs4.issubset(blobs))
|
self.assertTrue(blobs4.issubset(blobs))
|
||||||
|
# check that added_on gets set on downloads (was a bug)
|
||||||
|
self.assertLess(0, await self.daemon.storage.run_and_return_one_or_none("select min(added_on) from blob"))
|
||||||
|
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
||||||
|
await self.daemon.jsonrpc_get("foo4", save_file=False)
|
||||||
|
self.assertLess(0, await self.daemon.storage.run_and_return_one_or_none("select min(added_on) from blob"))
|
||||||
|
|
||||||
|
|
||||||
class TestBackgroundDownloaderComponent(CommandTestCase):
|
class TestBackgroundDownloaderComponent(CommandTestCase):
|
||||||
|
|
|
@ -84,7 +84,7 @@ class StorageTest(AsyncioTestCase):
|
||||||
await self.storage.add_blobs((blob_hash, length, 0, 0), finished=True)
|
await self.storage.add_blobs((blob_hash, length, 0, 0), finished=True)
|
||||||
|
|
||||||
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
|
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
|
||||||
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())]
|
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", 0, random_lbry_hash())]
|
||||||
descriptor = StreamDescriptor(
|
descriptor = StreamDescriptor(
|
||||||
asyncio.get_event_loop(), self.blob_dir, file_name, key, file_name, blobs, stream_hash
|
asyncio.get_event_loop(), self.blob_dir, file_name, key, file_name, blobs, stream_hash
|
||||||
)
|
)
|
||||||
|
@ -95,7 +95,7 @@ class StorageTest(AsyncioTestCase):
|
||||||
async def make_and_store_fake_stream(self, blob_count=2, stream_hash=None):
|
async def make_and_store_fake_stream(self, blob_count=2, stream_hash=None):
|
||||||
stream_hash = stream_hash or random_lbry_hash()
|
stream_hash = stream_hash or random_lbry_hash()
|
||||||
blobs = [
|
blobs = [
|
||||||
BlobInfo(i + 1, 100, "DEADBEEF", random_lbry_hash())
|
BlobInfo(i + 1, 100, "DEADBEEF", 0, random_lbry_hash())
|
||||||
for i in range(blob_count)
|
for i in range(blob_count)
|
||||||
]
|
]
|
||||||
await self.store_fake_stream(stream_hash, blobs)
|
await self.store_fake_stream(stream_hash, blobs)
|
||||||
|
|
|
@ -451,16 +451,16 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
await asyncio.sleep(0, loop=self.loop)
|
await asyncio.sleep(0, loop=self.loop)
|
||||||
self.stream_manager.stop()
|
self.stream_manager.stop()
|
||||||
self.client_blob_manager.stop()
|
self.client_blob_manager.stop()
|
||||||
|
# partial removal, only sd blob is missing.
|
||||||
|
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'
|
||||||
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
|
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
|
||||||
for blob in stream.descriptor.blobs[:-1]:
|
|
||||||
os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash))
|
|
||||||
await self.client_blob_manager.setup()
|
await self.client_blob_manager.setup()
|
||||||
await self.stream_manager.start()
|
await self.stream_manager.start()
|
||||||
self.assertEqual(1, len(self.stream_manager.streams))
|
self.assertEqual(1, len(self.stream_manager.streams))
|
||||||
self.assertListEqual([self.sd_hash], list(self.stream_manager.streams.keys()))
|
self.assertListEqual([self.sd_hash], list(self.stream_manager.streams.keys()))
|
||||||
for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]:
|
for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]:
|
||||||
blob_status = await self.client_storage.get_blob_status(blob_hash)
|
blob_status = await self.client_storage.get_blob_status(blob_hash)
|
||||||
self.assertEqual('pending', blob_status)
|
self.assertEqual('finished', blob_status)
|
||||||
self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status)
|
self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status)
|
||||||
|
|
||||||
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
|
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
|
||||||
|
@ -468,5 +468,24 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertTrue(sd_blob.get_is_verified())
|
self.assertTrue(sd_blob.get_is_verified())
|
||||||
self.assertListEqual(expected_analytics_events, received_events)
|
self.assertListEqual(expected_analytics_events, received_events)
|
||||||
|
|
||||||
|
# full removal, check that status is preserved (except sd blob, which was written)
|
||||||
|
self.client_blob_manager.stop()
|
||||||
|
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
|
||||||
|
for blob in stream.descriptor.blobs[:-1]:
|
||||||
|
os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash))
|
||||||
|
await self.client_blob_manager.setup()
|
||||||
|
await self.stream_manager.start()
|
||||||
|
for blob_hash in [b.blob_hash for b in stream.descriptor.blobs[:-1]]:
|
||||||
|
blob_status = await self.client_storage.get_blob_status(blob_hash)
|
||||||
|
self.assertEqual('pending', blob_status)
|
||||||
|
# sd blob was recovered
|
||||||
|
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
|
||||||
|
self.assertTrue(sd_blob.file_exists)
|
||||||
|
self.assertTrue(sd_blob.get_is_verified())
|
||||||
|
self.assertListEqual(expected_analytics_events, received_events)
|
||||||
|
# db reflects that too
|
||||||
|
blob_status = await self.client_storage.get_blob_status(stream.sd_hash)
|
||||||
|
self.assertEqual('finished', blob_status)
|
||||||
|
|
||||||
def test_download_then_recover_old_sort_stream_on_startup(self):
|
def test_download_then_recover_old_sort_stream_on_startup(self):
|
||||||
return self.test_download_then_recover_stream_on_startup(old_sort=True)
|
return self.test_download_then_recover_stream_on_startup(old_sort=True)
|
||||||
|
|
Loading…
Add table
Reference in a new issue