diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 03d3cd03c..bffd4e8ee 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -13,6 +13,7 @@ import time from abc import ABC, abstractmethod from collections import defaultdict from concurrent.futures.thread import ThreadPoolExecutor +from prometheus_client import Histogram import attr @@ -79,6 +80,12 @@ class MemPoolAPI(ABC): daemon's height at the time the mempool was obtained.""" +NAMESPACE = "wallet_server" +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + + class MemPool: """Representation of the daemon's mempool. @@ -107,6 +114,9 @@ class MemPool: self.lock = asyncio.Lock() self.wakeup = asyncio.Event() self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1)) + self.mempool_process_time_metric = Histogram( + "processed_mempool", "Time to process mempool and notify touched addresses", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -207,6 +217,7 @@ class MemPool: async def _refresh_hashes(self, synchronized_event): """Refresh our view of the daemon's mempool.""" while True: + start = time.perf_counter() height = self.api.cached_height() hex_hashes = await self.api.mempool_hashes() if height != await self.api.height(): @@ -217,13 +228,16 @@ class MemPool: synchronized_event.set() synchronized_event.clear() await self.api.on_mempool(touched, height) + timed_out = False try: # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) except asyncio.TimeoutError: - pass + timed_out = True finally: self.wakeup.clear() + duration = time.perf_counter() - start - (0 if not timed_out else self.refresh_secs) + self.mempool_process_time_metric.observe(duration) async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes