fix setting should_announce and blob status
This commit is contained in:
parent
5af351a234
commit
8cda3143f2
4 changed files with 13 additions and 7 deletions
|
@ -59,9 +59,8 @@ class BlobFileManager:
|
||||||
blobs = [self.get_blob(b) for b in blob_hashes]
|
blobs = [self.get_blob(b) for b in blob_hashes]
|
||||||
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
||||||
|
|
||||||
async def set_should_announce(self, blob_hash: str, should_announce: bool):
|
async def set_should_announce(self, blob_hash: str, should_announce: int):
|
||||||
now = self.loop.time()
|
return await self.storage.set_should_announce(blob_hash, should_announce)
|
||||||
return await self.storage.set_should_announce(blob_hash, now, should_announce)
|
|
||||||
|
|
||||||
async def get_all_verified_blobs(self) -> typing.List[str]:
|
async def get_all_verified_blobs(self) -> typing.List[str]:
|
||||||
blob_hashes = await self.storage.get_all_blob_hashes()
|
blob_hashes = await self.storage.get_all_blob_hashes()
|
||||||
|
|
|
@ -208,10 +208,10 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
log.debug("Adding a completed blob. blob_hash=%s", blob_hash)
|
log.debug("Adding a completed blob. blob_hash=%s", blob_hash)
|
||||||
return self.db.execute("update blob set status='finished' where blob.blob_hash=?", (blob_hash, ))
|
return self.db.execute("update blob set status='finished' where blob.blob_hash=?", (blob_hash, ))
|
||||||
|
|
||||||
def set_should_announce(self, blob_hash: str, next_announce_time: int, should_announce: int):
|
def set_should_announce(self, blob_hash: str, should_announce: int):
|
||||||
return self.db.execute(
|
return self.db.execute(
|
||||||
"update blob set next_announce_time=?, should_announce=? where blob_hash=?",
|
"update blob set next_announce_time=?, should_announce=? where blob_hash=?",
|
||||||
(next_announce_time or 0, int(bool(should_announce)), blob_hash)
|
(int(self.time_getter()), should_announce, blob_hash)
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_blob_status(self, blob_hash: str):
|
def get_blob_status(self, blob_hash: str):
|
||||||
|
|
|
@ -47,9 +47,11 @@ class StreamDownloader(StreamAssembler):
|
||||||
async def after_got_descriptor(self):
|
async def after_got_descriptor(self):
|
||||||
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
||||||
log.info("added head blob to search")
|
log.info("added head blob to search")
|
||||||
|
await self.blob_manager.set_should_announce(self.sd_hash, 1)
|
||||||
|
|
||||||
async def after_finished(self):
|
async def after_finished(self):
|
||||||
log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path)
|
log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path)
|
||||||
|
await self.blob_manager.set_should_announce(self.descriptor.blobs[0].blob_hash, 1)
|
||||||
await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished')
|
await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished')
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
|
@ -154,10 +154,14 @@ class ManagedStream:
|
||||||
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator
|
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator
|
||||||
)
|
)
|
||||||
sd_blob = blob_manager.get_blob(descriptor.sd_hash)
|
sd_blob = blob_manager.get_blob(descriptor.sd_hash)
|
||||||
await blob_manager.blob_completed(sd_blob)
|
|
||||||
await blob_manager.storage.store_stream(
|
await blob_manager.storage.store_stream(
|
||||||
blob_manager.get_blob(descriptor.sd_hash), descriptor
|
blob_manager.get_blob(descriptor.sd_hash), descriptor
|
||||||
)
|
)
|
||||||
|
await blob_manager.blob_completed(sd_blob)
|
||||||
|
for blob in descriptor.blobs[:-1]:
|
||||||
|
await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length))
|
||||||
|
await blob_manager.set_should_announce(sd_blob.blob_hash, 1)
|
||||||
|
await blob_manager.set_should_announce(descriptor.blobs[0].blob_hash, 1)
|
||||||
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
|
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
|
||||||
status=cls.STATUS_FINISHED)
|
status=cls.STATUS_FINISHED)
|
||||||
|
|
||||||
|
@ -185,7 +189,8 @@ class ManagedStream:
|
||||||
self.fully_reflected.set()
|
self.fully_reflected.set()
|
||||||
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||||
return []
|
return []
|
||||||
for blob_hash in needed:
|
we_have = [blob_hash for blob_hash in needed if blob_hash in self.blob_manager.completed_blob_hashes]
|
||||||
|
for blob_hash in we_have:
|
||||||
await protocol.send_blob(blob_hash)
|
await protocol.send_blob(blob_hash)
|
||||||
sent.append(blob_hash)
|
sent.append(blob_hash)
|
||||||
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
|
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
|
||||||
|
|
Loading…
Reference in a new issue