fix bug where recovery doesnt update blob status

This commit is contained in:
Victor Shyba 2022-03-21 21:33:33 -03:00
parent c5e2f19dde
commit aac72fa512
3 changed files with 35 additions and 6 deletions

View file

@ -113,9 +113,15 @@ class BlobManager:
(blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False) (blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False)
) )
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: def ensure_completed_blobs_status(self, blob_hashes: typing.List[str]) -> asyncio.Task:
"""Returns of the blobhashes_to_check, which are valid""" """Ensures that completed blobs from a given list of blob hashes are set as 'finished' in the database."""
return [blob_hash for blob_hash in blob_hashes if self.is_blob_verified(blob_hash)] to_add = []
for blob_hash in blob_hashes:
if not self.is_blob_verified(blob_hash):
continue
blob = self.get_blob(blob_hash)
to_add.append((blob.blob_hash, blob.length, blob.added_on, blob.is_mine))
return self.loop.create_task(self.storage.add_blobs(*to_add, finished=True))
def delete_blob(self, blob_hash: str): def delete_blob(self, blob_hash: str):
if not is_valid_blobhash(blob_hash): if not is_valid_blobhash(blob_hash):

View file

@ -70,6 +70,7 @@ class StreamManager(SourceManager):
async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def recover_streams(self, file_infos: typing.List[typing.Dict]):
to_restore = [] to_restore = []
to_check = []
async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str, async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str,
suggested_file_name: str, key: str, suggested_file_name: str, key: str,
@ -82,6 +83,7 @@ class StreamManager(SourceManager):
if not descriptor: if not descriptor:
return return
to_restore.append((descriptor, sd_blob, content_fee)) to_restore.append((descriptor, sd_blob, content_fee))
to_check.extend([sd_blob.blob_hash] + [blob.blob_hash for blob in descriptor.blobs[:-1]])
await asyncio.gather(*[ await asyncio.gather(*[
recover_stream( recover_stream(
@ -93,6 +95,8 @@ class StreamManager(SourceManager):
if to_restore: if to_restore:
await self.storage.recover_streams(to_restore, self.config.download_dir) await self.storage.recover_streams(to_restore, self.config.download_dir)
if to_check:
await self.blob_manager.ensure_completed_blobs_status(to_check)
# if self.blob_manager._save_blobs: # if self.blob_manager._save_blobs:
# log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))

View file

@ -451,16 +451,16 @@ class TestStreamManager(BlobExchangeTestBase):
await asyncio.sleep(0, loop=self.loop) await asyncio.sleep(0, loop=self.loop)
self.stream_manager.stop() self.stream_manager.stop()
self.client_blob_manager.stop() self.client_blob_manager.stop()
# partial removal, only sd blob is missing.
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash)) os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
for blob in stream.descriptor.blobs[:-1]:
os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash))
await self.client_blob_manager.setup() await self.client_blob_manager.setup()
await self.stream_manager.start() await self.stream_manager.start()
self.assertEqual(1, len(self.stream_manager.streams)) self.assertEqual(1, len(self.stream_manager.streams))
self.assertListEqual([self.sd_hash], list(self.stream_manager.streams.keys())) self.assertListEqual([self.sd_hash], list(self.stream_manager.streams.keys()))
for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]: for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]:
blob_status = await self.client_storage.get_blob_status(blob_hash) blob_status = await self.client_storage.get_blob_status(blob_hash)
self.assertEqual('pending', blob_status) self.assertEqual('finished', blob_status)
self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status) self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status)
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
@ -468,5 +468,24 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertTrue(sd_blob.get_is_verified()) self.assertTrue(sd_blob.get_is_verified())
self.assertListEqual(expected_analytics_events, received_events) self.assertListEqual(expected_analytics_events, received_events)
# full removal, check that status is preserved (except sd blob, which was written)
self.client_blob_manager.stop()
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
for blob in stream.descriptor.blobs[:-1]:
os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash))
await self.client_blob_manager.setup()
await self.stream_manager.start()
for blob_hash in [b.blob_hash for b in stream.descriptor.blobs[:-1]]:
blob_status = await self.client_storage.get_blob_status(blob_hash)
self.assertEqual('pending', blob_status)
# sd blob was recovered
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
self.assertTrue(sd_blob.file_exists)
self.assertTrue(sd_blob.get_is_verified())
self.assertListEqual(expected_analytics_events, received_events)
# db reflects that too
blob_status = await self.client_storage.get_blob_status(stream.sd_hash)
self.assertEqual('finished', blob_status)
def test_download_then_recover_old_sort_stream_on_startup(self): def test_download_then_recover_old_sort_stream_on_startup(self):
return self.test_download_then_recover_stream_on_startup(old_sort=True) return self.test_download_then_recover_stream_on_startup(old_sort=True)