mempool_process_time_metric
This commit is contained in:
parent
caf616234b
commit
70596042d6
1 changed files with 15 additions and 1 deletions
|
@ -13,6 +13,7 @@ import time
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
from prometheus_client import Histogram
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
|
@ -79,6 +80,12 @@ class MemPoolAPI(ABC):
|
||||||
daemon's height at the time the mempool was obtained."""
|
daemon's height at the time the mempool was obtained."""
|
||||||
|
|
||||||
|
|
||||||
|
NAMESPACE = "wallet_server"
|
||||||
|
HISTOGRAM_BUCKETS = (
|
||||||
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MemPool:
|
class MemPool:
|
||||||
"""Representation of the daemon's mempool.
|
"""Representation of the daemon's mempool.
|
||||||
|
|
||||||
|
@ -107,6 +114,9 @@ class MemPool:
|
||||||
self.lock = asyncio.Lock()
|
self.lock = asyncio.Lock()
|
||||||
self.wakeup = asyncio.Event()
|
self.wakeup = asyncio.Event()
|
||||||
self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1))
|
self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1))
|
||||||
|
self.mempool_process_time_metric = Histogram(
|
||||||
|
"processed_mempool", "Time to process mempool and notify touched addresses", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||||
|
)
|
||||||
|
|
||||||
async def _logging(self, synchronized_event):
|
async def _logging(self, synchronized_event):
|
||||||
"""Print regular logs of mempool stats."""
|
"""Print regular logs of mempool stats."""
|
||||||
|
@ -207,6 +217,7 @@ class MemPool:
|
||||||
async def _refresh_hashes(self, synchronized_event):
|
async def _refresh_hashes(self, synchronized_event):
|
||||||
"""Refresh our view of the daemon's mempool."""
|
"""Refresh our view of the daemon's mempool."""
|
||||||
while True:
|
while True:
|
||||||
|
start = time.perf_counter()
|
||||||
height = self.api.cached_height()
|
height = self.api.cached_height()
|
||||||
hex_hashes = await self.api.mempool_hashes()
|
hex_hashes = await self.api.mempool_hashes()
|
||||||
if height != await self.api.height():
|
if height != await self.api.height():
|
||||||
|
@ -217,13 +228,16 @@ class MemPool:
|
||||||
synchronized_event.set()
|
synchronized_event.set()
|
||||||
synchronized_event.clear()
|
synchronized_event.clear()
|
||||||
await self.api.on_mempool(touched, height)
|
await self.api.on_mempool(touched, height)
|
||||||
|
timed_out = False
|
||||||
try:
|
try:
|
||||||
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
|
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
|
||||||
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
pass
|
timed_out = True
|
||||||
finally:
|
finally:
|
||||||
self.wakeup.clear()
|
self.wakeup.clear()
|
||||||
|
duration = time.perf_counter() - start - (0 if not timed_out else self.refresh_secs)
|
||||||
|
self.mempool_process_time_metric.observe(duration)
|
||||||
|
|
||||||
async def _process_mempool(self, all_hashes):
|
async def _process_mempool(self, all_hashes):
|
||||||
# Re-sync with the new set of hashes
|
# Re-sync with the new set of hashes
|
||||||
|
|
Loading…
Add table
Reference in a new issue