threadpool

This commit is contained in:
Jack Robison 2021-07-22 17:33:54 -04:00
parent 077ca987f7
commit 813e506b68
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 17 additions and 31 deletions

View file

@ -182,7 +182,6 @@ class BlockProcessor:
self.daemon = daemon
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event
self.coin = env.coin
if env.coin.NET == 'mainnet':
self.ledger = Ledger
@ -281,7 +280,7 @@ class BlockProcessor:
# consistent and not being updated elsewhere.
async def run_in_thread_locked():
async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(self.executor, func, *args)
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.shield(run_in_thread_locked())
async def check_and_advance_blocks(self, raw_blocks):
@ -1421,4 +1420,4 @@ class BlockProcessor:
# Shut down block processing
self.logger.info('closing the DB for a clean shutdown...')
self.db.close()
self.executor.shutdown(wait=True)
# self.executor.shutdown(wait=True)

View file

@ -11,7 +11,6 @@
import asyncio
import array
import os
import time
import typing
import struct
@ -21,26 +20,20 @@ import base64
from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List
from functools import partial
from asyncio import sleep
from bisect import bisect_right, bisect_left
from bisect import bisect_right
from collections import defaultdict
from glob import glob
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from lbry.utils import LRUCacheWithMetrics
from lbry.schema.url import URL
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN
from lbry.wallet.server.hash import hash_to_hex_str
from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, RevertableOpStack
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
from lbry.wallet.server.db.claimtrie import length_encoded_name
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
from lbry.wallet.transaction import OutputScript
from lbry.schema.claim import Claim, guess_stream_type
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
@ -111,7 +104,6 @@ class LevelDB:
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.env = env
self.coin = env.coin
self.executor = None
self.logger.info(f'switching current directory to {env.db_dir}')
@ -361,7 +353,7 @@ class LevelDB:
return resolved_stream, resolved_channel
async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]:
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url)
return await asyncio.get_event_loop().run_in_executor(None, self._fs_resolve, url)
def _fs_get_claim_by_hash(self, claim_hash):
claim = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash))
@ -375,7 +367,7 @@ class LevelDB:
async def fs_getclaimbyid(self, claim_id):
return await asyncio.get_event_loop().run_in_executor(
self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id)
None, self._fs_get_claim_by_hash, bytes.fromhex(claim_id)
)
def get_claim_txo_amount(self, claim_hash: bytes) -> Optional[int]:
@ -682,7 +674,7 @@ class LevelDB:
for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False)
)
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts)
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
self.tx_counts = array.array('I', tx_counts)
@ -698,7 +690,7 @@ class LevelDB:
start = time.perf_counter()
self.logger.info("loading txids")
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids)
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
self.total_transactions = txids
self.transaction_num_mapping = {
@ -716,16 +708,13 @@ class LevelDB:
header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False)
]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
headers = await asyncio.get_event_loop().run_in_executor(None, get_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers
async def open_dbs(self):
if self.db:
return
if self.executor is None:
self.executor = ThreadPoolExecutor(1)
assert self.db is None
self.db = self.db_class(f'lbry-{self.env.db_engine}', True)
if self.db.is_new:
@ -771,8 +760,6 @@ class LevelDB:
def close(self):
self.db.close()
self.executor.shutdown(wait=True)
self.executor = None
# Header merkle cache
@ -986,7 +973,7 @@ class LevelDB:
return tx_infos
async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids)
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
@ -1011,7 +998,7 @@ class LevelDB:
limit to None to get them all.
"""
while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, self.read_history, hashX, limit)
history = await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit)
if history is not None:
return [(self.total_transactions[tx_num], bisect_right(self.tx_counts, tx_num)) for tx_num in history]
self.logger.warning(f'limited_history: tx hash '
@ -1094,7 +1081,7 @@ class LevelDB:
return utxos
while True:
utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos)
utxos = await asyncio.get_event_loop().run_in_executor(None, read_utxos)
if all(utxo.tx_hash is not None for utxo in utxos):
return utxos
self.logger.warning(f'all_utxos: tx hash not '
@ -1116,4 +1103,4 @@ class LevelDB:
if utxo_value:
utxo_append((hashX, Prefixes.utxo.unpack_value(utxo_value).amount))
return utxos
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos)
return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos)

View file

@ -69,7 +69,7 @@ class Server:
def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(1)
executor = ThreadPoolExecutor(4)
loop.set_default_executor(executor)
def __exit():

View file

@ -22,7 +22,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids)
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')

View file

@ -913,7 +913,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids)
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')