From e22bc01cbdf71647ad5ba35a8e0aa92e185ff0dd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 16:21:43 -0400 Subject: [PATCH] re-add wakeup event, add address history metric --- lbry/wallet/server/mempool.py | 8 +++++++- lbry/wallet/server/session.py | 11 ++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index fd045ced7..7ca887892 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -232,7 +232,13 @@ class MemPool: await self.api.on_mempool(touched, height) duration = time.perf_counter() - start self.mempool_process_time_metric.observe(duration) - await asyncio.sleep(self.refresh_secs) + try: + # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) + await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) + except asyncio.TimeoutError: + pass + finally: + self.wakeup.clear() async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 98f7b6fd5..810b11dba 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -123,6 +123,7 @@ HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) + class SessionManager: """Holds global state about all sessions.""" @@ -160,6 +161,10 @@ class SessionManager: "clients", "Number of connections received per client version", namespace=NAMESPACE, labelnames=("version",) ) + address_history_metric = Histogram( + "address_history", "Time to fetch an address history", + namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', shutdown_event: asyncio.Event): @@ -924,11 +929,13 @@ class LBRYElectrumX(SessionBase): return async def send_history_notification(alias, hashX): + start = time.perf_counter() if len(alias) == 64: method = 'blockchain.scripthash.subscribe' else: method = 'blockchain.address.subscribe' status = await self.address_status(hashX) + self.session_mgr.address_history_metric.observe(time.perf_counter() - start) await self.send_notification(method, (alias, status)) touched = touched.intersection(self.hashX_subs) @@ -941,9 +948,6 @@ class LBRYElectrumX(SessionBase): es = '' if len(touched) == 1 else 'es' self.logger.info(f'notified of {len(touched):,d} address{es}') - - - def get_metrics_or_placeholder_for_api(self, query_name): """ Do not hold on to a reference to the metrics returned by this method past an `await` or @@ -1487,6 +1491,7 @@ class LBRYElectrumX(SessionBase): try: hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) self.txs_sent += 1 + self.mempool.wakeup.set() self.logger.info(f'sent tx: {hex_hash}') return hex_hash except DaemonError as e: