From 05b6bdb8f69e43ca3f8d2ff06be6dc43a38e6b23 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 22 Jul 2021 17:33:54 -0400 Subject: [PATCH] threadpool --- lbry/wallet/server/block_processor.py | 5 +-- lbry/wallet/server/leveldb.py | 37 ++++++------------- lbry/wallet/server/server.py | 2 +- .../test_blockchain_reorganization.py | 2 +- .../blockchain/test_resolve_command.py | 2 +- 5 files changed, 17 insertions(+), 31 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 2075eae23..5d8fb7599 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -182,7 +182,6 @@ class BlockProcessor: self.daemon = daemon self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.shutdown_event = shutdown_event - self.coin = env.coin if env.coin.NET == 'mainnet': self.ledger = Ledger @@ -281,7 +280,7 @@ class BlockProcessor: # consistent and not being updated elsewhere. async def run_in_thread_locked(): async with self.state_lock: - return await asyncio.get_event_loop().run_in_executor(self.executor, func, *args) + return await asyncio.get_event_loop().run_in_executor(None, func, *args) return await asyncio.shield(run_in_thread_locked()) async def check_and_advance_blocks(self, raw_blocks): @@ -1421,4 +1420,4 @@ class BlockProcessor: # Shut down block processing self.logger.info('closing the DB for a clean shutdown...') self.db.close() - self.executor.shutdown(wait=True) + # self.executor.shutdown(wait=True) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 1225e40c3..7ba1347a4 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -11,7 +11,6 @@ import asyncio import array -import os import time import typing import struct @@ -21,26 +20,20 @@ import base64 from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List from functools import partial from asyncio import sleep -from bisect import bisect_right, bisect_left +from bisect import bisect_right from collections import defaultdict -from glob import glob -from struct import pack, unpack -from concurrent.futures.thread import ThreadPoolExecutor from lbry.utils import LRUCacheWithMetrics from lbry.schema.url import URL from lbry.wallet.server import util -from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN +from lbry.wallet.server.hash import hash_to_hex_str from lbry.wallet.server.tx import TxInput from lbry.wallet.server.merkle import Merkle, MerkleCache -from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.storage import db_class -from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, RevertableOpStack from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE -from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue -from lbry.wallet.server.db.claimtrie import length_encoded_name +from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue from lbry.wallet.transaction import OutputScript from lbry.schema.claim import Claim, guess_stream_type from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger @@ -111,7 +104,6 @@ class LevelDB: self.logger = util.class_logger(__name__, self.__class__.__name__) self.env = env self.coin = env.coin - self.executor = None self.logger.info(f'switching current directory to {env.db_dir}') @@ -361,7 +353,7 @@ class LevelDB: return resolved_stream, resolved_channel async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: - return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url) + return await asyncio.get_event_loop().run_in_executor(None, self._fs_resolve, url) def _fs_get_claim_by_hash(self, claim_hash): claim = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash)) @@ -375,7 +367,7 @@ class LevelDB: async def fs_getclaimbyid(self, claim_id): return await asyncio.get_event_loop().run_in_executor( - self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id) + None, self._fs_get_claim_by_hash, bytes.fromhex(claim_id) ) def get_claim_txo_amount(self, claim_hash: bytes) -> Optional[int]: @@ -682,7 +674,7 @@ class LevelDB: for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False) ) - tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) + tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts) assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" self.tx_counts = array.array('I', tx_counts) @@ -698,7 +690,7 @@ class LevelDB: start = time.perf_counter() self.logger.info("loading txids") - txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids) + txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1] self.total_transactions = txids self.transaction_num_mapping = { @@ -716,16 +708,13 @@ class LevelDB: header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False) ] - headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) + headers = await asyncio.get_event_loop().run_in_executor(None, get_headers) assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" self.headers = headers async def open_dbs(self): if self.db: return - if self.executor is None: - self.executor = ThreadPoolExecutor(1) - assert self.db is None self.db = self.db_class(f'lbry-{self.env.db_engine}', True) if self.db.is_new: @@ -771,8 +760,6 @@ class LevelDB: def close(self): self.db.close() - self.executor.shutdown(wait=True) - self.executor = None # Header merkle cache @@ -986,7 +973,7 @@ class LevelDB: return tx_infos async def fs_transactions(self, txids): - return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) + return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids) async def fs_block_hashes(self, height, count): if height + count > len(self.headers): @@ -1011,7 +998,7 @@ class LevelDB: limit to None to get them all. """ while True: - history = await asyncio.get_event_loop().run_in_executor(self.executor, self.read_history, hashX, limit) + history = await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit) if history is not None: return [(self.total_transactions[tx_num], bisect_right(self.tx_counts, tx_num)) for tx_num in history] self.logger.warning(f'limited_history: tx hash ' @@ -1094,7 +1081,7 @@ class LevelDB: return utxos while True: - utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos) + utxos = await asyncio.get_event_loop().run_in_executor(None, read_utxos) if all(utxo.tx_hash is not None for utxo in utxos): return utxos self.logger.warning(f'all_utxos: tx hash not ' @@ -1116,4 +1103,4 @@ class LevelDB: if utxo_value: utxo_append((hashX, Prefixes.utxo.unpack_value(utxo_value).amount)) return utxos - return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos) + return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos) diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index 24d8c395d..21572feca 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -69,7 +69,7 @@ class Server: def run(self): loop = asyncio.get_event_loop() - executor = ThreadPoolExecutor(1) + executor = ThreadPoolExecutor(4) loop.set_default_executor(executor) def __exit(): diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index b7fef197d..72724a68e 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -22,7 +22,7 @@ class BlockchainReorganizationTests(CommandTestCase): self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) - txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids) + txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) txs = await bp.db.fs_transactions(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index 702d39a1d..10b894b24 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -913,7 +913,7 @@ class ResolveAfterReorg(BaseResolveTestCase): self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) - txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids) + txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) txs = await bp.db.fs_transactions(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')