diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index dac7b0f85..9c6f10682 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1,6 +1,7 @@ import time import asyncio from struct import pack, unpack +from concurrent.futures.thread import ThreadPoolExecutor import lbry from lbry.schema.claim import Claim @@ -145,6 +146,7 @@ class BlockProcessor: self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.logger = class_logger(__name__, self.__class__.__name__) + self.executor = ThreadPoolExecutor(1) # Meta self.next_cache_check = 0 @@ -165,6 +167,7 @@ class BlockProcessor: self.state_lock = asyncio.Lock() self.search_cache = {} + async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -173,7 +176,7 @@ class BlockProcessor: # consistent and not being updated elsewhere. async def run_in_thread_locked(): async with self.state_lock: - return await asyncio.get_event_loop().run_in_executor(None, func, *args) + return await asyncio.get_event_loop().run_in_executor(self.executor, func, *args) return await asyncio.shield(run_in_thread_locked()) async def check_and_advance_blocks(self, raw_blocks): @@ -657,6 +660,7 @@ class BlockProcessor: self.logger.info('flushing to DB for a clean shutdown...') await self.flush(True) self.db.close() + self.executor.shutdown(wait=True) def force_chain_reorg(self, count): """Force a reorg of the given number of blocks. diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 0ce465389..cdcbbe50a 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -20,7 +20,7 @@ from collections import namedtuple from functools import partial from glob import glob from struct import pack, unpack - +from concurrent.futures.thread import ThreadPoolExecutor import attr from lbry.wallet.server import util @@ -63,6 +63,7 @@ class LevelDB: self.logger = util.class_logger(__name__, self.__class__.__name__) self.env = env self.coin = env.coin + self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) # Setup block header size handlers if self.coin.STATIC_BLOCK_HEADERS: @@ -139,6 +140,7 @@ class LevelDB: def close(self): self.utxo_db.close() self.history.close_db() + self.executor.shutdown(wait=True) async def open_for_compacting(self): await self._open_dbs(True, True) @@ -406,7 +408,7 @@ class LevelDB: return self.headers_file.read(offset, size), disk_count return b'', 0 - return await asyncio.get_event_loop().run_in_executor(None, read_headers) + return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) def fs_tx_hash(self, tx_num): """Return a par (tx_hash, tx_height) for the given tx number. @@ -445,7 +447,7 @@ class LevelDB: return [fs_tx_hash(tx_num) for tx_num in tx_nums] while True: - history = await asyncio.get_event_loop().run_in_executor(None, read_history) + history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) if all(hash is not None for hash, height in history): return history self.logger.warning(f'limited_history: tx hash ' @@ -611,7 +613,7 @@ class LevelDB: return utxos while True: - utxos = await asyncio.get_event_loop().run_in_executor(None, read_utxos) + utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos) if all(utxo.tx_hash is not None for utxo in utxos): return utxos self.logger.warning(f'all_utxos: tx hash not ' @@ -664,5 +666,5 @@ class LevelDB: return hashX, value return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs] - hashX_pairs = await asyncio.get_event_loop().run_in_executor(None, lookup_hashXs) - return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos, hashX_pairs) + hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs) + return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index de959379e..8816d2ad3 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -6,12 +6,13 @@ # and warranty status of this software. """Mempool handling.""" - +import os import asyncio import itertools import time from abc import ABC, abstractmethod from collections import defaultdict +from concurrent.futures.thread import ThreadPoolExecutor import attr @@ -105,6 +106,7 @@ class MemPool: # Prevents mempool refreshes during fee histogram calculation self.lock = asyncio.Lock() self.wakeup = asyncio.Event() + self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1)) async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -121,12 +123,15 @@ class MemPool: await synchronized_event.wait() async def _refresh_histogram(self, synchronized_event): - while True: - await synchronized_event.wait() - async with self.lock: - # Threaded as can be expensive - await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000) - await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) + try: + while True: + await synchronized_event.wait() + async with self.lock: + # Threaded as can be expensive + await asyncio.get_event_loop().run_in_executor(self.executor, self._update_histogram, 100_000) + await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) + finally: + self.executor.shutdown(wait=True) def _update_histogram(self, bin_size): # Build a histogram by fee rate @@ -289,7 +294,7 @@ class MemPool: return txs # Thread this potentially slow operation so as not to block - tx_map = await asyncio.get_event_loop().run_in_executor(None, deserialize_txs) + tx_map = await asyncio.get_event_loop().run_in_executor(self.executor, deserialize_txs) # Determine all prevouts not in the mempool, and fetch the # UTXO information from the database. Failed prevout lookups