diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py index e4da3cfa9..9629e06b6 100644 --- a/lbry/dht/blob_announcer.py +++ b/lbry/dht/blob_announcer.py @@ -28,21 +28,23 @@ class BlobAnnouncer: self.announce_task: asyncio.Task = None self.announce_queue: typing.List[str] = [] self._done = asyncio.Event() + self.announced = set() - async def _submit_announcement(self, blob_hash): - try: - - peers = len(await self.node.announce_blob(blob_hash)) - self.announcements_sent_metric.labels(peers=peers, error=False).inc() - if peers > 4: - return blob_hash - else: - log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) - except Exception as err: - self.announcements_sent_metric.labels(peers=0, error=True).inc() - if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 - raise err - log.warning("error announcing %s: %s", blob_hash[:8], str(err)) + async def _run_consumer(self): + while self.announce_queue: + try: + blob_hash = self.announce_queue.pop() + peers = len(await self.node.announce_blob(blob_hash)) + self.announcements_sent_metric.labels(peers=peers, error=False).inc() + if peers > 4: + self.announced.add(blob_hash) + else: + log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) + except Exception as err: + self.announcements_sent_metric.labels(peers=0, error=True).inc() + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 + raise err + log.warning("error announcing %s: %s", blob_hash[:8], str(err)) async def _announce(self, batch_size: typing.Optional[int] = 10): while batch_size: @@ -57,14 +59,12 @@ class BlobAnnouncer: log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) while len(self.announce_queue) > 0: log.info("%i blobs to announce", len(self.announce_queue)) - announced = await asyncio.gather(*[ - self._submit_announcement( - self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue - ], loop=self.loop) - announced = list(filter(None, announced)) + await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop) + announced = list(filter(None, self.announced)) if announced: await self.storage.update_last_announced_blobs(announced) log.info("announced %i blobs", len(announced)) + self.announced.clear() self._done.set() self._done.clear()