implement announcer as a consumer task on gather

This commit is contained in:
Victor Shyba 2022-02-15 00:55:36 -03:00 committed by Victor Shyba
parent 7ded8a1333
commit f05943ff79

View file

@ -28,21 +28,23 @@ class BlobAnnouncer:
self.announce_task: asyncio.Task = None self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = [] self.announce_queue: typing.List[str] = []
self._done = asyncio.Event() self._done = asyncio.Event()
self.announced = set()
async def _submit_announcement(self, blob_hash): async def _run_consumer(self):
try: while self.announce_queue:
try:
peers = len(await self.node.announce_blob(blob_hash)) blob_hash = self.announce_queue.pop()
self.announcements_sent_metric.labels(peers=peers, error=False).inc() peers = len(await self.node.announce_blob(blob_hash))
if peers > 4: self.announcements_sent_metric.labels(peers=peers, error=False).inc()
return blob_hash if peers > 4:
else: self.announced.add(blob_hash)
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) else:
except Exception as err: log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
self.announcements_sent_metric.labels(peers=0, error=True).inc() except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 self.announcements_sent_metric.labels(peers=0, error=True).inc()
raise err if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
log.warning("error announcing %s: %s", blob_hash[:8], str(err)) raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10): async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size: while batch_size:
@ -57,14 +59,12 @@ class BlobAnnouncer:
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0: while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue)) log.info("%i blobs to announce", len(self.announce_queue))
announced = await asyncio.gather(*[ await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
self._submit_announcement( announced = list(filter(None, self.announced))
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
], loop=self.loop)
announced = list(filter(None, announced))
if announced: if announced:
await self.storage.update_last_announced_blobs(announced) await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced)) log.info("announced %i blobs", len(announced))
self.announced.clear()
self._done.set() self._done.set()
self._done.clear() self._done.clear()