forked from LBRYCommunity/lbry-sdk
announce orphan blobs manually, as that was done when save stream
This commit is contained in:
parent
813108b9d8
commit
c5f6a4ca1c
3 changed files with 10 additions and 0 deletions
|
@ -421,6 +421,7 @@ class BackgroundDownloader(Component):
|
||||||
return
|
return
|
||||||
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||||
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
|
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
|
||||||
|
storage = blob_manager.storage
|
||||||
node = None
|
node = None
|
||||||
if self.component_manager.has_component(DHT_COMPONENT):
|
if self.component_manager.has_component(DHT_COMPONENT):
|
||||||
node = self.component_manager.get_component(DHT_COMPONENT)
|
node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
|
@ -430,6 +431,7 @@ class BackgroundDownloader(Component):
|
||||||
return
|
return
|
||||||
for blob_info in downloader.descriptor.blobs[:-1]:
|
for blob_info in downloader.descriptor.blobs[:-1]:
|
||||||
await downloader.download_stream_blob(blob_info)
|
await downloader.download_stream_blob(blob_info)
|
||||||
|
await storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self.task = asyncio.create_task(self.loop())
|
self.task = asyncio.create_task(self.loop())
|
||||||
|
|
|
@ -380,6 +380,11 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
"select status from blob where blob_hash=?", blob_hash
|
"select status from blob where blob_hash=?", blob_hash
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def set_announce(self, *blob_hashes):
|
||||||
|
return self.db.execute_fetchall(
|
||||||
|
"update blob set should_announce=1 where blob_hash in (?, ?)", blob_hashes
|
||||||
|
)
|
||||||
|
|
||||||
def update_last_announced_blobs(self, blob_hashes: typing.List[str]):
|
def update_last_announced_blobs(self, blob_hashes: typing.List[str]):
|
||||||
def _update_last_announced_blobs(transaction: sqlite3.Connection):
|
def _update_last_announced_blobs(transaction: sqlite3.Connection):
|
||||||
last_announced = self.time_getter()
|
last_announced = self.time_getter()
|
||||||
|
|
|
@ -584,10 +584,13 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
async def assertBlobs(self, *sd_hashes, no_files=True):
|
async def assertBlobs(self, *sd_hashes, no_files=True):
|
||||||
# checks that we have ony the finished blobs needed for the the referenced streams
|
# checks that we have ony the finished blobs needed for the the referenced streams
|
||||||
seen = set(sd_hashes)
|
seen = set(sd_hashes)
|
||||||
|
to_announce = await self.daemon.storage.get_blobs_to_announce()
|
||||||
for sd_hash in sd_hashes:
|
for sd_hash in sd_hashes:
|
||||||
|
self.assertIn(sd_hash, to_announce)
|
||||||
sd_blob = self.daemon.blob_manager.get_blob(sd_hash)
|
sd_blob = self.daemon.blob_manager.get_blob(sd_hash)
|
||||||
self.assertTrue(sd_blob.get_is_verified())
|
self.assertTrue(sd_blob.get_is_verified())
|
||||||
blobs = await self.get_blobs_from_sd_blob(sd_blob)
|
blobs = await self.get_blobs_from_sd_blob(sd_blob)
|
||||||
|
self.assertIn(blobs[0].blob_hash, to_announce)
|
||||||
for blob in blobs[:-1]:
|
for blob in blobs[:-1]:
|
||||||
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
||||||
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
||||||
|
|
Loading…
Reference in a new issue