forked from LBRYCommunity/lbry-sdk
Merge pull request #1860 from lbryio/fix-announce
Fix setting blob completed and next announce time
This commit is contained in:
commit
af43b21ef6
5 changed files with 14 additions and 7 deletions
|
@ -59,9 +59,8 @@ class BlobFileManager:
|
|||
blobs = [self.get_blob(b) for b in blob_hashes]
|
||||
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
||||
|
||||
async def set_should_announce(self, blob_hash: str, should_announce: bool):
|
||||
now = self.loop.time()
|
||||
return await self.storage.set_should_announce(blob_hash, now, should_announce)
|
||||
async def set_should_announce(self, blob_hash: str, should_announce: int):
|
||||
return await self.storage.set_should_announce(blob_hash, should_announce)
|
||||
|
||||
async def get_all_verified_blobs(self) -> typing.List[str]:
|
||||
blob_hashes = await self.storage.get_all_blob_hashes()
|
||||
|
|
|
@ -247,6 +247,7 @@ def main(argv=None):
|
|||
)
|
||||
file_handler.setFormatter(default_formatter)
|
||||
log.addHandler(file_handler)
|
||||
logging.getLogger('torba').addHandler(file_handler)
|
||||
|
||||
if not args.quiet:
|
||||
handler = logging.StreamHandler()
|
||||
|
|
|
@ -208,10 +208,10 @@ class SQLiteStorage(SQLiteMixin):
|
|||
log.debug("Adding a completed blob. blob_hash=%s", blob_hash)
|
||||
return self.db.execute("update blob set status='finished' where blob.blob_hash=?", (blob_hash, ))
|
||||
|
||||
def set_should_announce(self, blob_hash: str, next_announce_time: int, should_announce: int):
|
||||
def set_should_announce(self, blob_hash: str, should_announce: int):
|
||||
return self.db.execute(
|
||||
"update blob set next_announce_time=?, should_announce=? where blob_hash=?",
|
||||
(next_announce_time or 0, int(bool(should_announce)), blob_hash)
|
||||
(int(self.time_getter()), should_announce, blob_hash)
|
||||
)
|
||||
|
||||
def get_blob_status(self, blob_hash: str):
|
||||
|
|
|
@ -47,9 +47,11 @@ class StreamDownloader(StreamAssembler):
|
|||
async def after_got_descriptor(self):
|
||||
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
||||
log.info("added head blob to search")
|
||||
await self.blob_manager.set_should_announce(self.sd_hash, 1)
|
||||
|
||||
async def after_finished(self):
|
||||
log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path)
|
||||
await self.blob_manager.set_should_announce(self.descriptor.blobs[0].blob_hash, 1)
|
||||
await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished')
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -154,10 +154,14 @@ class ManagedStream:
|
|||
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator
|
||||
)
|
||||
sd_blob = blob_manager.get_blob(descriptor.sd_hash)
|
||||
await blob_manager.blob_completed(sd_blob)
|
||||
await blob_manager.storage.store_stream(
|
||||
blob_manager.get_blob(descriptor.sd_hash), descriptor
|
||||
)
|
||||
await blob_manager.blob_completed(sd_blob)
|
||||
for blob in descriptor.blobs[:-1]:
|
||||
await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length))
|
||||
await blob_manager.set_should_announce(sd_blob.blob_hash, 1)
|
||||
await blob_manager.set_should_announce(descriptor.blobs[0].blob_hash, 1)
|
||||
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
|
||||
status=cls.STATUS_FINISHED)
|
||||
|
||||
|
@ -185,7 +189,8 @@ class ManagedStream:
|
|||
self.fully_reflected.set()
|
||||
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||
return []
|
||||
for blob_hash in needed:
|
||||
we_have = [blob_hash for blob_hash in needed if blob_hash in self.blob_manager.completed_blob_hashes]
|
||||
for blob_hash in we_have:
|
||||
await protocol.send_blob(blob_hash)
|
||||
sent.append(blob_hash)
|
||||
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
|
||||
|
|
Loading…
Reference in a new issue