forked from LBRYCommunity/lbry-sdk
Merge pull request #3135 from lbryio/loop-metrics
add prometheus metrics for asyncio loop
This commit is contained in:
commit
42ad2bb83f
2 changed files with 39 additions and 3 deletions
|
@ -1,14 +1,44 @@
|
||||||
|
import time
|
||||||
import logging
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import asyncio.tasks
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from prometheus_client import generate_latest as prom_generate_latest
|
from prometheus_client import generate_latest as prom_generate_latest
|
||||||
|
from prometheus_client import Counter, Histogram, Gauge
|
||||||
|
|
||||||
|
|
||||||
|
PROBES_IN_FLIGHT = Counter("probes_in_flight", "Number of loop probes in flight", namespace='asyncio')
|
||||||
|
PROBES_FINISHED = Counter("probes_finished", "Number of finished loop probes", namespace='asyncio')
|
||||||
|
PROBE_TIMES = Histogram("probe_times", "Loop probe times", namespace='asyncio')
|
||||||
|
TASK_COUNT = Gauge("running_tasks", "Number of running tasks", namespace='asyncio')
|
||||||
|
|
||||||
|
|
||||||
|
def get_loop_metrics(delay=1):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
def callback(started):
|
||||||
|
PROBE_TIMES.observe(time.perf_counter() - started - delay)
|
||||||
|
PROBES_FINISHED.inc()
|
||||||
|
|
||||||
|
async def monitor_loop_responsiveness():
|
||||||
|
while True:
|
||||||
|
now = time.perf_counter()
|
||||||
|
loop.call_later(delay, callback, now)
|
||||||
|
PROBES_IN_FLIGHT.inc()
|
||||||
|
TASK_COUNT.set(len(asyncio.tasks._all_tasks))
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
|
||||||
|
return loop.create_task(monitor_loop_responsiveness())
|
||||||
|
|
||||||
|
|
||||||
class PrometheusServer:
|
class PrometheusServer:
|
||||||
def __init__(self, logger=None):
|
def __init__(self, logger=None):
|
||||||
self.runner = None
|
self.runner = None
|
||||||
self.logger = logger or logging.getLogger(__name__)
|
self.logger = logger or logging.getLogger(__name__)
|
||||||
|
self._monitor_loop_task = None
|
||||||
|
|
||||||
async def start(self, interface: str, port: int):
|
async def start(self, interface: str, port: int):
|
||||||
|
self.logger.info("start prometheus metrics")
|
||||||
prom_app = web.Application()
|
prom_app = web.Application()
|
||||||
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
||||||
self.runner = web.AppRunner(prom_app)
|
self.runner = web.AppRunner(prom_app)
|
||||||
|
@ -16,7 +46,10 @@ class PrometheusServer:
|
||||||
|
|
||||||
metrics_site = web.TCPSite(self.runner, interface, port, shutdown_timeout=.5)
|
metrics_site = web.TCPSite(self.runner, interface, port, shutdown_timeout=.5)
|
||||||
await metrics_site.start()
|
await metrics_site.start()
|
||||||
self.logger.info('metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2])
|
self.logger.info(
|
||||||
|
'prometheus metrics server listening on %s:%i', *metrics_site._server.sockets[0].getsockname()[:2]
|
||||||
|
)
|
||||||
|
self._monitor_loop_task = get_loop_metrics()
|
||||||
|
|
||||||
async def handle_metrics_get_request(self, request: web.Request):
|
async def handle_metrics_get_request(self, request: web.Request):
|
||||||
try:
|
try:
|
||||||
|
@ -29,4 +62,7 @@ class PrometheusServer:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
if self._monitor_loop_task and not self._monitor_loop_task.done():
|
||||||
|
self._monitor_loop_task.cancel()
|
||||||
|
self._monitor_loop_task = None
|
||||||
await self.runner.cleanup()
|
await self.runner.cleanup()
|
||||||
|
|
|
@ -4,9 +4,9 @@ from lbry.extras.daemon.exchange_rate_manager import ExchangeRate, ExchangeRateM
|
||||||
|
|
||||||
|
|
||||||
class TestExchangeRateManager(AsyncioTestCase):
|
class TestExchangeRateManager(AsyncioTestCase):
|
||||||
|
|
||||||
async def test_exchange_rate_manager(self):
|
async def test_exchange_rate_manager(self):
|
||||||
# TODO: re-enable cryptonator.com
|
# TODO: re-enable cryptonator.com
|
||||||
|
# TODO: this uses real exchange rate feeds... update to use mocks
|
||||||
manager = ExchangeRateManager(FEEDS)
|
manager = ExchangeRateManager(FEEDS)
|
||||||
manager.start()
|
manager.start()
|
||||||
self.addCleanup(manager.stop)
|
self.addCleanup(manager.stop)
|
||||||
|
@ -18,5 +18,5 @@ class TestExchangeRateManager(AsyncioTestCase):
|
||||||
self.assertTrue(feed.is_online)
|
self.assertTrue(feed.is_online)
|
||||||
self.assertIsInstance(feed.rate, ExchangeRate)
|
self.assertIsInstance(feed.rate, ExchangeRate)
|
||||||
lbc = manager.convert_currency('USD', 'LBC', Decimal('0.01'))
|
lbc = manager.convert_currency('USD', 'LBC', Decimal('0.01'))
|
||||||
self.assertGreaterEqual(lbc, 0.1)
|
self.assertGreaterEqual(lbc, 0.01)
|
||||||
self.assertLessEqual(lbc, 10.0)
|
self.assertLessEqual(lbc, 10.0)
|
||||||
|
|
Loading…
Reference in a new issue