From aac72fa51296c6e7b1fbb35645697cafb6d8cab1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Mar 2022 21:33:33 -0300 Subject: [PATCH] fix bug where recovery doesnt update blob status --- lbry/blob/blob_manager.py | 12 +++++++++--- lbry/stream/stream_manager.py | 4 ++++ tests/unit/stream/test_stream_manager.py | 25 +++++++++++++++++++++--- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/lbry/blob/blob_manager.py b/lbry/blob/blob_manager.py index 52441ecfb..15709be08 100644 --- a/lbry/blob/blob_manager.py +++ b/lbry/blob/blob_manager.py @@ -113,9 +113,15 @@ class BlobManager: (blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False) ) - def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: - """Returns of the blobhashes_to_check, which are valid""" - return [blob_hash for blob_hash in blob_hashes if self.is_blob_verified(blob_hash)] + def ensure_completed_blobs_status(self, blob_hashes: typing.List[str]) -> asyncio.Task: + """Ensures that completed blobs from a given list of blob hashes are set as 'finished' in the database.""" + to_add = [] + for blob_hash in blob_hashes: + if not self.is_blob_verified(blob_hash): + continue + blob = self.get_blob(blob_hash) + to_add.append((blob.blob_hash, blob.length, blob.added_on, blob.is_mine)) + return self.loop.create_task(self.storage.add_blobs(*to_add, finished=True)) def delete_blob(self, blob_hash: str): if not is_valid_blobhash(blob_hash): diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 72fd1414a..6b4f705ff 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -70,6 +70,7 @@ class StreamManager(SourceManager): async def recover_streams(self, file_infos: typing.List[typing.Dict]): to_restore = [] + to_check = [] async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str, suggested_file_name: str, key: str, @@ -82,6 +83,7 @@ class StreamManager(SourceManager): if not descriptor: return to_restore.append((descriptor, sd_blob, content_fee)) + to_check.extend([sd_blob.blob_hash] + [blob.blob_hash for blob in descriptor.blobs[:-1]]) await asyncio.gather(*[ recover_stream( @@ -93,6 +95,8 @@ class StreamManager(SourceManager): if to_restore: await self.storage.recover_streams(to_restore, self.config.download_dir) + if to_check: + await self.blob_manager.ensure_completed_blobs_status(to_check) # if self.blob_manager._save_blobs: # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index a6767509f..76e2a5c04 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -451,16 +451,16 @@ class TestStreamManager(BlobExchangeTestBase): await asyncio.sleep(0, loop=self.loop) self.stream_manager.stop() self.client_blob_manager.stop() + # partial removal, only sd blob is missing. + # in this case, we recover the sd blob while the other blobs are kept untouched as 'finished' os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash)) - for blob in stream.descriptor.blobs[:-1]: - os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash)) await self.client_blob_manager.setup() await self.stream_manager.start() self.assertEqual(1, len(self.stream_manager.streams)) self.assertListEqual([self.sd_hash], list(self.stream_manager.streams.keys())) for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]: blob_status = await self.client_storage.get_blob_status(blob_hash) - self.assertEqual('pending', blob_status) + self.assertEqual('finished', blob_status) self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status) sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) @@ -468,5 +468,24 @@ class TestStreamManager(BlobExchangeTestBase): self.assertTrue(sd_blob.get_is_verified()) self.assertListEqual(expected_analytics_events, received_events) + # full removal, check that status is preserved (except sd blob, which was written) + self.client_blob_manager.stop() + os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash)) + for blob in stream.descriptor.blobs[:-1]: + os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash)) + await self.client_blob_manager.setup() + await self.stream_manager.start() + for blob_hash in [b.blob_hash for b in stream.descriptor.blobs[:-1]]: + blob_status = await self.client_storage.get_blob_status(blob_hash) + self.assertEqual('pending', blob_status) + # sd blob was recovered + sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) + self.assertTrue(sd_blob.file_exists) + self.assertTrue(sd_blob.get_is_verified()) + self.assertListEqual(expected_analytics_events, received_events) + # db reflects that too + blob_status = await self.client_storage.get_blob_status(stream.sd_hash) + self.assertEqual('finished', blob_status) + def test_download_then_recover_old_sort_stream_on_startup(self): return self.test_download_then_recover_stream_on_startup(old_sort=True)