announce orphan blobs manually, as that was done when save stream

This commit is contained in:
Victor Shyba 2021-10-22 02:55:59 -03:00 committed by Jack Robison
parent 95b2c8d175
commit b39c26fc86
3 changed files with 10 additions and 0 deletions

View file

@ -421,6 +421,7 @@ class BackgroundDownloader(Component):
return return
blob_manager = self.component_manager.get_component(BLOB_COMPONENT) blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
storage = blob_manager.storage
node = None node = None
if self.component_manager.has_component(DHT_COMPONENT): if self.component_manager.has_component(DHT_COMPONENT):
node = self.component_manager.get_component(DHT_COMPONENT) node = self.component_manager.get_component(DHT_COMPONENT)
@ -430,6 +431,7 @@ class BackgroundDownloader(Component):
return return
for blob_info in downloader.descriptor.blobs[:-1]: for blob_info in downloader.descriptor.blobs[:-1]:
await downloader.download_stream_blob(blob_info) await downloader.download_stream_blob(blob_info)
await storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash)
async def start(self): async def start(self):
self.task = asyncio.create_task(self.loop()) self.task = asyncio.create_task(self.loop())

View file

@ -380,6 +380,11 @@ class SQLiteStorage(SQLiteMixin):
"select status from blob where blob_hash=?", blob_hash "select status from blob where blob_hash=?", blob_hash
) )
def set_announce(self, *blob_hashes):
return self.db.execute_fetchall(
"update blob set should_announce=1 where blob_hash in (?, ?)", blob_hashes
)
def update_last_announced_blobs(self, blob_hashes: typing.List[str]): def update_last_announced_blobs(self, blob_hashes: typing.List[str]):
def _update_last_announced_blobs(transaction: sqlite3.Connection): def _update_last_announced_blobs(transaction: sqlite3.Connection):
last_announced = self.time_getter() last_announced = self.time_getter()

View file

@ -584,10 +584,13 @@ class TestProactiveDownloaderComponent(CommandTestCase):
async def assertBlobs(self, *sd_hashes, no_files=True): async def assertBlobs(self, *sd_hashes, no_files=True):
# checks that we have ony the finished blobs needed for the the referenced streams # checks that we have ony the finished blobs needed for the the referenced streams
seen = set(sd_hashes) seen = set(sd_hashes)
to_announce = await self.daemon.storage.get_blobs_to_announce()
for sd_hash in sd_hashes: for sd_hash in sd_hashes:
self.assertIn(sd_hash, to_announce)
sd_blob = self.daemon.blob_manager.get_blob(sd_hash) sd_blob = self.daemon.blob_manager.get_blob(sd_hash)
self.assertTrue(sd_blob.get_is_verified()) self.assertTrue(sd_blob.get_is_verified())
blobs = await self.get_blobs_from_sd_blob(sd_blob) blobs = await self.get_blobs_from_sd_blob(sd_blob)
self.assertIn(blobs[0].blob_hash, to_announce)
for blob in blobs[:-1]: for blob in blobs[:-1]:
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)