forked from LBRYCommunity/lbry-sdk
wake up mempool on broadcast
This commit is contained in:
parent
91846939f6
commit
d66c801350
2 changed files with 11 additions and 5 deletions
|
@ -11,7 +11,6 @@ import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import time
|
import time
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from asyncio import Lock, sleep
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -104,7 +103,8 @@ class MemPool:
|
||||||
self.refresh_secs = refresh_secs
|
self.refresh_secs = refresh_secs
|
||||||
self.log_status_secs = log_status_secs
|
self.log_status_secs = log_status_secs
|
||||||
# Prevents mempool refreshes during fee histogram calculation
|
# Prevents mempool refreshes during fee histogram calculation
|
||||||
self.lock = Lock()
|
self.lock = asyncio.Lock()
|
||||||
|
self.wakeup = asyncio.Event()
|
||||||
|
|
||||||
async def _logging(self, synchronized_event):
|
async def _logging(self, synchronized_event):
|
||||||
"""Print regular logs of mempool stats."""
|
"""Print regular logs of mempool stats."""
|
||||||
|
@ -117,7 +117,7 @@ class MemPool:
|
||||||
while True:
|
while True:
|
||||||
self.logger.info(f'{len(self.txs):,d} txs '
|
self.logger.info(f'{len(self.txs):,d} txs '
|
||||||
f'touching {len(self.hashXs):,d} addresses')
|
f'touching {len(self.hashXs):,d} addresses')
|
||||||
await sleep(self.log_status_secs)
|
await asyncio.sleep(self.log_status_secs)
|
||||||
await synchronized_event.wait()
|
await synchronized_event.wait()
|
||||||
|
|
||||||
async def _refresh_histogram(self, synchronized_event):
|
async def _refresh_histogram(self, synchronized_event):
|
||||||
|
@ -126,7 +126,7 @@ class MemPool:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
# Threaded as can be expensive
|
# Threaded as can be expensive
|
||||||
await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000)
|
await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000)
|
||||||
await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
|
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
|
||||||
|
|
||||||
def _update_histogram(self, bin_size):
|
def _update_histogram(self, bin_size):
|
||||||
# Build a histogram by fee rate
|
# Build a histogram by fee rate
|
||||||
|
@ -212,7 +212,12 @@ class MemPool:
|
||||||
synchronized_event.set()
|
synchronized_event.set()
|
||||||
synchronized_event.clear()
|
synchronized_event.clear()
|
||||||
await self.api.on_mempool(touched, height)
|
await self.api.on_mempool(touched, height)
|
||||||
await sleep(self.refresh_secs)
|
try:
|
||||||
|
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self.wakeup.clear()
|
||||||
|
|
||||||
async def _process_mempool(self, all_hashes):
|
async def _process_mempool(self, all_hashes):
|
||||||
# Re-sync with the new set of hashes
|
# Re-sync with the new set of hashes
|
||||||
|
|
|
@ -567,6 +567,7 @@ class SessionManager:
|
||||||
|
|
||||||
async def broadcast_transaction(self, raw_tx):
|
async def broadcast_transaction(self, raw_tx):
|
||||||
hex_hash = await self.daemon.broadcast_transaction(raw_tx)
|
hex_hash = await self.daemon.broadcast_transaction(raw_tx)
|
||||||
|
self.mempool.wakeup.set()
|
||||||
self.txs_sent += 1
|
self.txs_sent += 1
|
||||||
return hex_hash
|
return hex_hash
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue