diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py index 5d977aff4..24cf18bbe 100644 --- a/lbry/dht/blob_announcer.py +++ b/lbry/dht/blob_announcer.py @@ -1,6 +1,9 @@ import asyncio import typing import logging + +from prometheus_client import Counter, Gauge + if typing.TYPE_CHECKING: from lbry.dht.node import Node from lbry.extras.daemon.storage import SQLiteStorage @@ -9,6 +12,15 @@ log = logging.getLogger(__name__) class BlobAnnouncer: + announcements_sent_metric = Counter( + "announcements_sent", "Number of announcements sent and their respective status.", namespace="dht_node", + labelnames=("peers", "error"), + ) + announcement_queue_size_metric = Gauge( + "announcement_queue_size", "Number of hashes waiting to be announced.", namespace="dht_node", + labelnames=("scope",) + ) + def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'): self.loop = loop self.node = node @@ -18,12 +30,15 @@ class BlobAnnouncer: async def _submit_announcement(self, blob_hash): try: + peers = len(await self.node.announce_blob(blob_hash)) + self.announcements_sent_metric.labels(peers=peers, error=False).inc() if peers > 4: return blob_hash else: log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) except Exception as err: + self.announcements_sent_metric.labels(peers=0, error=True).inc() if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise err log.warning("error announcing %s: %s", blob_hash[:8], str(err)) @@ -37,6 +52,7 @@ class BlobAnnouncer: log.warning("No peers in DHT, announce round skipped") continue self.announce_queue.extend(await self.storage.get_blobs_to_announce()) + self.announcement_queue_size_metric.labels(scope="global").set(len(self.announce_queue)) log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) while len(self.announce_queue) > 0: log.info("%i blobs to announce", len(self.announce_queue))