mempool threadpool

This commit is contained in:
Jack Robison 2020-02-25 14:17:33 -05:00
parent d94c40e371
commit 7945e1ea3c
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -12,6 +12,7 @@ import itertools
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
import attr import attr
@ -105,6 +106,7 @@ class MemPool:
# Prevents mempool refreshes during fee histogram calculation # Prevents mempool refreshes during fee histogram calculation
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.wakeup = asyncio.Event() self.wakeup = asyncio.Event()
self.executor = ThreadPoolExecutor(4)
async def _logging(self, synchronized_event): async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats.""" """Print regular logs of mempool stats."""
@ -121,12 +123,15 @@ class MemPool:
await synchronized_event.wait() await synchronized_event.wait()
async def _refresh_histogram(self, synchronized_event): async def _refresh_histogram(self, synchronized_event):
while True: try:
await synchronized_event.wait() while True:
async with self.lock: await synchronized_event.wait()
# Threaded as can be expensive async with self.lock:
await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000) # Threaded as can be expensive
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) await asyncio.get_event_loop().run_in_executor(self.executor, self._update_histogram, 100_000)
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
finally:
self.executor.shutdown(wait=True)
def _update_histogram(self, bin_size): def _update_histogram(self, bin_size):
# Build a histogram by fee rate # Build a histogram by fee rate
@ -289,7 +294,7 @@ class MemPool:
return txs return txs
# Thread this potentially slow operation so as not to block # Thread this potentially slow operation so as not to block
tx_map = await asyncio.get_event_loop().run_in_executor(None, deserialize_txs) tx_map = await asyncio.get_event_loop().run_in_executor(self.executor, deserialize_txs)
# Determine all prevouts not in the mempool, and fetch the # Determine all prevouts not in the mempool, and fetch the
# UTXO information from the database. Failed prevout lookups # UTXO information from the database. Failed prevout lookups