diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 0ce465389..cdcbbe50a 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -20,7 +20,7 @@ from collections import namedtuple from functools import partial from glob import glob from struct import pack, unpack - +from concurrent.futures.thread import ThreadPoolExecutor import attr from lbry.wallet.server import util @@ -63,6 +63,7 @@ class LevelDB: self.logger = util.class_logger(__name__, self.__class__.__name__) self.env = env self.coin = env.coin + self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) # Setup block header size handlers if self.coin.STATIC_BLOCK_HEADERS: @@ -139,6 +140,7 @@ class LevelDB: def close(self): self.utxo_db.close() self.history.close_db() + self.executor.shutdown(wait=True) async def open_for_compacting(self): await self._open_dbs(True, True) @@ -406,7 +408,7 @@ class LevelDB: return self.headers_file.read(offset, size), disk_count return b'', 0 - return await asyncio.get_event_loop().run_in_executor(None, read_headers) + return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) def fs_tx_hash(self, tx_num): """Return a par (tx_hash, tx_height) for the given tx number. @@ -445,7 +447,7 @@ class LevelDB: return [fs_tx_hash(tx_num) for tx_num in tx_nums] while True: - history = await asyncio.get_event_loop().run_in_executor(None, read_history) + history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) if all(hash is not None for hash, height in history): return history self.logger.warning(f'limited_history: tx hash ' @@ -611,7 +613,7 @@ class LevelDB: return utxos while True: - utxos = await asyncio.get_event_loop().run_in_executor(None, read_utxos) + utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos) if all(utxo.tx_hash is not None for utxo in utxos): return utxos self.logger.warning(f'all_utxos: tx hash not ' @@ -664,5 +666,5 @@ class LevelDB: return hashX, value return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs] - hashX_pairs = await asyncio.get_event_loop().run_in_executor(None, lookup_hashXs) - return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos, hashX_pairs) + hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs) + return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs)