diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 4a0a80f30..3ef6c5628 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -6,13 +6,11 @@ # and warranty status of this software. """Mempool handling.""" -import os import asyncio import itertools import time from abc import ABC, abstractmethod from collections import defaultdict -from concurrent.futures.thread import ThreadPoolExecutor from prometheus_client import Histogram import attr @@ -117,7 +115,6 @@ class MemPool: # Prevents mempool refreshes during fee histogram calculation self.lock = asyncio.Lock() self.wakeup = asyncio.Event() - self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1)) self.mempool_process_time_metric = mempool_process_time_metric async def _logging(self, synchronized_event): @@ -135,15 +132,11 @@ class MemPool: await synchronized_event.wait() async def _refresh_histogram(self, synchronized_event): - try: - while True: - await synchronized_event.wait() - async with self.lock: - # Threaded as can be expensive - await asyncio.get_event_loop().run_in_executor(self.executor, self._update_histogram, 100_000) - await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) - finally: - self.executor.shutdown(wait=True) + while True: + await synchronized_event.wait() + async with self.lock: + self._update_histogram(100_000) + await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) def _update_histogram(self, bin_size): # Build a histogram by fee rate @@ -286,30 +279,25 @@ class MemPool: hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes) raw_txs = await self.api.raw_transactions(hex_hashes_iter) - def deserialize_txs(): # This function is pure - to_hashX = self.coin.hashX_from_script - deserializer = self.coin.DESERIALIZER + to_hashX = self.coin.hashX_from_script + deserializer = self.coin.DESERIALIZER - txs = {} - for hash, raw_tx in zip(hashes, raw_txs): - # The daemon may have evicted the tx from its - # mempool or it may have gotten in a block - if not raw_tx: - continue - tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() - # Convert the inputs and outputs into (hashX, value) pairs - # Drop generation-like inputs from MemPoolTx.prevouts - txin_pairs = tuple((txin.prev_hash, txin.prev_idx) - for txin in tx.inputs - if not txin.is_generation()) - txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) - for txout in tx.outputs) - txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs, - 0, tx_size) - return txs - - # Thread this potentially slow operation so as not to block - tx_map = await asyncio.get_event_loop().run_in_executor(self.executor, deserialize_txs) + tx_map = {} + for hash, raw_tx in zip(hashes, raw_txs): + # The daemon may have evicted the tx from its + # mempool or it may have gotten in a block + if not raw_tx: + continue + tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() + # Convert the inputs and outputs into (hashX, value) pairs + # Drop generation-like inputs from MemPoolTx.prevouts + txin_pairs = tuple((txin.prev_hash, txin.prev_idx) + for txin in tx.inputs + if not txin.is_generation()) + txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) + for txout in tx.outputs) + tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs, + 0, tx_size) # Determine all prevouts not in the mempool, and fetch the # UTXO information from the database. Failed prevout lookups