Merge pull request #3548 from lbryio/announce_metrics
Add optional Prometheus metrics for DHT announcements
This commit is contained in:
commit
3305eb67c6
1 changed files with 16 additions and 0 deletions
|
@ -1,6 +1,9 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from prometheus_client import Counter, Gauge
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
|
@ -9,6 +12,15 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class BlobAnnouncer:
|
class BlobAnnouncer:
|
||||||
|
announcements_sent_metric = Counter(
|
||||||
|
"announcements_sent", "Number of announcements sent and their respective status.", namespace="dht_node",
|
||||||
|
labelnames=("peers", "error"),
|
||||||
|
)
|
||||||
|
announcement_queue_size_metric = Gauge(
|
||||||
|
"announcement_queue_size", "Number of hashes waiting to be announced.", namespace="dht_node",
|
||||||
|
labelnames=("scope",)
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'):
|
def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.node = node
|
self.node = node
|
||||||
|
@ -18,12 +30,15 @@ class BlobAnnouncer:
|
||||||
|
|
||||||
async def _submit_announcement(self, blob_hash):
|
async def _submit_announcement(self, blob_hash):
|
||||||
try:
|
try:
|
||||||
|
|
||||||
peers = len(await self.node.announce_blob(blob_hash))
|
peers = len(await self.node.announce_blob(blob_hash))
|
||||||
|
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
|
||||||
if peers > 4:
|
if peers > 4:
|
||||||
return blob_hash
|
return blob_hash
|
||||||
else:
|
else:
|
||||||
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
||||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||||
raise err
|
raise err
|
||||||
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||||
|
@ -37,6 +52,7 @@ class BlobAnnouncer:
|
||||||
log.warning("No peers in DHT, announce round skipped")
|
log.warning("No peers in DHT, announce round skipped")
|
||||||
continue
|
continue
|
||||||
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
|
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
|
||||||
|
self.announcement_queue_size_metric.labels(scope="global").set(len(self.announce_queue))
|
||||||
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
||||||
while len(self.announce_queue) > 0:
|
while len(self.announce_queue) > 0:
|
||||||
log.info("%i blobs to announce", len(self.announce_queue))
|
log.info("%i blobs to announce", len(self.announce_queue))
|
||||||
|
|
Loading…
Reference in a new issue