re-add wakeup event, add address history metric

This commit is contained in:
Jack Robison 2020-06-15 16:21:43 -04:00
parent 6c28713a4c
commit e22bc01cbd
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 15 additions and 4 deletions

View file

@ -232,7 +232,13 @@ class MemPool:
await self.api.on_mempool(touched, height) await self.api.on_mempool(touched, height)
duration = time.perf_counter() - start duration = time.perf_counter() - start
self.mempool_process_time_metric.observe(duration) self.mempool_process_time_metric.observe(duration)
await asyncio.sleep(self.refresh_secs) try:
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
except asyncio.TimeoutError:
pass
finally:
self.wakeup.clear()
async def _process_mempool(self, all_hashes): async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes # Re-sync with the new set of hashes

View file

@ -123,6 +123,7 @@ HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
) )
class SessionManager: class SessionManager:
"""Holds global state about all sessions.""" """Holds global state about all sessions."""
@ -160,6 +161,10 @@ class SessionManager:
"clients", "Number of connections received per client version", "clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",) namespace=NAMESPACE, labelnames=("version",)
) )
address_history_metric = Histogram(
"address_history", "Time to fetch an address history",
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event): shutdown_event: asyncio.Event):
@ -924,11 +929,13 @@ class LBRYElectrumX(SessionBase):
return return
async def send_history_notification(alias, hashX): async def send_history_notification(alias, hashX):
start = time.perf_counter()
if len(alias) == 64: if len(alias) == 64:
method = 'blockchain.scripthash.subscribe' method = 'blockchain.scripthash.subscribe'
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
status = await self.address_status(hashX) status = await self.address_status(hashX)
self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
await self.send_notification(method, (alias, status)) await self.send_notification(method, (alias, status))
touched = touched.intersection(self.hashX_subs) touched = touched.intersection(self.hashX_subs)
@ -941,9 +948,6 @@ class LBRYElectrumX(SessionBase):
es = '' if len(touched) == 1 else 'es' es = '' if len(touched) == 1 else 'es'
self.logger.info(f'notified of {len(touched):,d} address{es}') self.logger.info(f'notified of {len(touched):,d} address{es}')
def get_metrics_or_placeholder_for_api(self, query_name): def get_metrics_or_placeholder_for_api(self, query_name):
""" Do not hold on to a reference to the metrics """ Do not hold on to a reference to the metrics
returned by this method past an `await` or returned by this method past an `await` or
@ -1487,6 +1491,7 @@ class LBRYElectrumX(SessionBase):
try: try:
hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) hex_hash = await self.session_mgr.broadcast_transaction(raw_tx)
self.txs_sent += 1 self.txs_sent += 1
self.mempool.wakeup.set()
self.logger.info(f'sent tx: {hex_hash}') self.logger.info(f'sent tx: {hex_hash}')
return hex_hash return hex_hash
except DaemonError as e: except DaemonError as e: