threadpool

This commit is contained in:
Jack Robison 2021-07-22 17:33:54 -04:00 committed by Victor Shyba
parent 991d4f8859
commit 05b6bdb8f6
5 changed files with 17 additions and 31 deletions

View file

@ -182,7 +182,6 @@ class BlockProcessor:
self.daemon = daemon self.daemon = daemon
self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.coin = env.coin self.coin = env.coin
if env.coin.NET == 'mainnet': if env.coin.NET == 'mainnet':
self.ledger = Ledger self.ledger = Ledger
@ -281,7 +280,7 @@ class BlockProcessor:
# consistent and not being updated elsewhere. # consistent and not being updated elsewhere.
async def run_in_thread_locked(): async def run_in_thread_locked():
async with self.state_lock: async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(self.executor, func, *args) return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.shield(run_in_thread_locked()) return await asyncio.shield(run_in_thread_locked())
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
@ -1421,4 +1420,4 @@ class BlockProcessor:
# Shut down block processing # Shut down block processing
self.logger.info('closing the DB for a clean shutdown...') self.logger.info('closing the DB for a clean shutdown...')
self.db.close() self.db.close()
self.executor.shutdown(wait=True) # self.executor.shutdown(wait=True)

View file

@ -11,7 +11,6 @@
import asyncio import asyncio
import array import array
import os
import time import time
import typing import typing
import struct import struct
@ -21,26 +20,20 @@ import base64
from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List
from functools import partial from functools import partial
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right, bisect_left from bisect import bisect_right
from collections import defaultdict from collections import defaultdict
from glob import glob
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from lbry.utils import LRUCacheWithMetrics from lbry.utils import LRUCacheWithMetrics
from lbry.schema.url import URL from lbry.schema.url import URL
from lbry.wallet.server import util from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN from lbry.wallet.server.hash import hash_to_hex_str
from lbry.wallet.server.tx import TxInput from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, RevertableOpStack
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
from lbry.wallet.server.db.claimtrie import length_encoded_name
from lbry.wallet.transaction import OutputScript from lbry.wallet.transaction import OutputScript
from lbry.schema.claim import Claim, guess_stream_type from lbry.schema.claim import Claim, guess_stream_type
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
@ -111,7 +104,6 @@ class LevelDB:
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.env = env self.env = env
self.coin = env.coin self.coin = env.coin
self.executor = None
self.logger.info(f'switching current directory to {env.db_dir}') self.logger.info(f'switching current directory to {env.db_dir}')
@ -361,7 +353,7 @@ class LevelDB:
return resolved_stream, resolved_channel return resolved_stream, resolved_channel
async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]:
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url) return await asyncio.get_event_loop().run_in_executor(None, self._fs_resolve, url)
def _fs_get_claim_by_hash(self, claim_hash): def _fs_get_claim_by_hash(self, claim_hash):
claim = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash)) claim = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash))
@ -375,7 +367,7 @@ class LevelDB:
async def fs_getclaimbyid(self, claim_id): async def fs_getclaimbyid(self, claim_id):
return await asyncio.get_event_loop().run_in_executor( return await asyncio.get_event_loop().run_in_executor(
self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id) None, self._fs_get_claim_by_hash, bytes.fromhex(claim_id)
) )
def get_claim_txo_amount(self, claim_hash: bytes) -> Optional[int]: def get_claim_txo_amount(self, claim_hash: bytes) -> Optional[int]:
@ -682,7 +674,7 @@ class LevelDB:
for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False) for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False)
) )
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts)
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
self.tx_counts = array.array('I', tx_counts) self.tx_counts = array.array('I', tx_counts)
@ -698,7 +690,7 @@ class LevelDB:
start = time.perf_counter() start = time.perf_counter()
self.logger.info("loading txids") self.logger.info("loading txids")
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1] assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
self.total_transactions = txids self.total_transactions = txids
self.transaction_num_mapping = { self.transaction_num_mapping = {
@ -716,16 +708,13 @@ class LevelDB:
header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False) header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False)
] ]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) headers = await asyncio.get_event_loop().run_in_executor(None, get_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers self.headers = headers
async def open_dbs(self): async def open_dbs(self):
if self.db: if self.db:
return return
if self.executor is None:
self.executor = ThreadPoolExecutor(1)
assert self.db is None assert self.db is None
self.db = self.db_class(f'lbry-{self.env.db_engine}', True) self.db = self.db_class(f'lbry-{self.env.db_engine}', True)
if self.db.is_new: if self.db.is_new:
@ -771,8 +760,6 @@ class LevelDB:
def close(self): def close(self):
self.db.close() self.db.close()
self.executor.shutdown(wait=True)
self.executor = None
# Header merkle cache # Header merkle cache
@ -986,7 +973,7 @@ class LevelDB:
return tx_infos return tx_infos
async def fs_transactions(self, txids): async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
async def fs_block_hashes(self, height, count): async def fs_block_hashes(self, height, count):
if height + count > len(self.headers): if height + count > len(self.headers):
@ -1011,7 +998,7 @@ class LevelDB:
limit to None to get them all. limit to None to get them all.
""" """
while True: while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, self.read_history, hashX, limit) history = await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit)
if history is not None: if history is not None:
return [(self.total_transactions[tx_num], bisect_right(self.tx_counts, tx_num)) for tx_num in history] return [(self.total_transactions[tx_num], bisect_right(self.tx_counts, tx_num)) for tx_num in history]
self.logger.warning(f'limited_history: tx hash ' self.logger.warning(f'limited_history: tx hash '
@ -1094,7 +1081,7 @@ class LevelDB:
return utxos return utxos
while True: while True:
utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos) utxos = await asyncio.get_event_loop().run_in_executor(None, read_utxos)
if all(utxo.tx_hash is not None for utxo in utxos): if all(utxo.tx_hash is not None for utxo in utxos):
return utxos return utxos
self.logger.warning(f'all_utxos: tx hash not ' self.logger.warning(f'all_utxos: tx hash not '
@ -1116,4 +1103,4 @@ class LevelDB:
if utxo_value: if utxo_value:
utxo_append((hashX, Prefixes.utxo.unpack_value(utxo_value).amount)) utxo_append((hashX, Prefixes.utxo.unpack_value(utxo_value).amount))
return utxos return utxos
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos) return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos)

View file

@ -69,7 +69,7 @@ class Server:
def run(self): def run(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(1) executor = ThreadPoolExecutor(4)
loop.set_default_executor(executor) loop.set_default_executor(executor)
def __exit(): def __exit():

View file

@ -22,7 +22,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids) txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')

View file

@ -913,7 +913,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids) txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')