executors

This commit is contained in:
Jack Robison 2022-01-16 14:00:48 -05:00
parent ba6b985d71
commit 47305e7446
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 20 additions and 18 deletions

View file

@ -464,7 +464,8 @@ class CommandTestCase(IntegrationTestCase):
async def confirm_tx(self, txid, ledger=None): async def confirm_tx(self, txid, ledger=None):
""" Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """ """ Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
await self.on_transaction_id(txid, ledger) await self.on_transaction_id(txid, ledger)
await asyncio.wait([self.generate(1), self.on_transaction_id(txid, ledger)], timeout=5) on_tx = self.on_transaction_id(txid, ledger)
await asyncio.wait([self.generate(1), on_tx], timeout=5)
return txid return txid
async def on_transaction_dict(self, tx): async def on_transaction_dict(self, tx):

View file

@ -86,11 +86,11 @@ class BlockProcessor:
self.env = env self.env = env
self.state_lock = asyncio.Lock() self.state_lock = asyncio.Lock()
self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) self.daemon = env.coin.DAEMON(env.coin, env.daemon_url)
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self.db = HubDB( self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
max_open_files=env.db_max_open_files max_open_files=env.db_max_open_files, executor=self._chain_executor
) )
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
self.coin = env.coin self.coin = env.coin
if env.coin.NET == 'mainnet': if env.coin.NET == 'mainnet':

View file

@ -20,15 +20,15 @@ class BlockchainReader:
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
self.cancellable_tasks = [] self.cancellable_tasks = []
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
self.db = HubDB( self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1 secondary_name=secondary_name, max_open_files=-1, executor=self._executor
) )
self.last_state: typing.Optional[DBState] = None self.last_state: typing.Optional[DBState] = None
self._refresh_interval = 0.1 self._refresh_interval = 0.1
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
def _detect_changes(self): def _detect_changes(self):
try: try:
@ -241,7 +241,6 @@ class BlockchainReaderServer(BlockchainReader):
pass pass
finally: finally:
loop.run_until_complete(self.stop()) loop.run_until_complete(self.stop())
executor.shutdown(True)
async def start_prometheus(self): async def start_prometheus(self):
if not self.prometheus_server and self.env.prometheus_port: if not self.prometheus_server and self.env.prometheus_port:

View file

@ -11,6 +11,7 @@ from functools import partial
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import defaultdict from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from lbry.error import ResolveCensoredError from lbry.error import ResolveCensoredError
from lbry.schema.result import Censor from lbry.schema.result import Censor
@ -43,9 +44,10 @@ class HubDB:
def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
secondary_name: str = '', max_open_files: int = 256): secondary_name: str = '', max_open_files: int = 256, executor: ThreadPoolExecutor = None):
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.coin = coin self.coin = coin
self._executor = executor
self._db_dir = db_dir self._db_dir = db_dir
self._cache_MB = cache_MB self._cache_MB = cache_MB
@ -332,7 +334,7 @@ class HubDB:
return ExpandedResolveResult(resolved_stream, resolved_channel, repost, reposted_channel) return ExpandedResolveResult(resolved_stream, resolved_channel, repost, reposted_channel)
async def resolve(self, url) -> ExpandedResolveResult: async def resolve(self, url) -> ExpandedResolveResult:
return await asyncio.get_event_loop().run_in_executor(None, self._resolve, url) return await asyncio.get_event_loop().run_in_executor(self._executor, self._resolve, url)
def _fs_get_claim_by_hash(self, claim_hash): def _fs_get_claim_by_hash(self, claim_hash):
claim = self.get_cached_claim_txo(claim_hash) claim = self.get_cached_claim_txo(claim_hash)
@ -417,7 +419,7 @@ class HubDB:
self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes( self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes(
self.filtering_channel_hashes self.filtering_channel_hashes
) )
await asyncio.get_event_loop().run_in_executor(None, reload) await asyncio.get_event_loop().run_in_executor(self._executor, reload)
def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]): def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]):
streams, channels = {}, {} streams, channels = {}, {}
@ -741,7 +743,7 @@ class HubDB:
v.tx_count for v in self.prefix_db.tx_count.iterate(include_key=False, fill_cache=False) v.tx_count for v in self.prefix_db.tx_count.iterate(include_key=False, fill_cache=False)
] ]
tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts) tx_counts = await asyncio.get_event_loop().run_in_executor(self._executor, get_counts)
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
self.tx_counts = array.array('I', tx_counts) self.tx_counts = array.array('I', tx_counts)
@ -762,7 +764,7 @@ class HubDB:
self.txo_to_claim.clear() self.txo_to_claim.clear()
start = time.perf_counter() start = time.perf_counter()
self.logger.info("loading claims") self.logger.info("loading claims")
await asyncio.get_event_loop().run_in_executor(None, read_claim_txos) await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
ts = time.perf_counter() - start ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4)) self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
@ -777,7 +779,7 @@ class HubDB:
) )
] ]
headers = await asyncio.get_event_loop().run_in_executor(None, get_headers) headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers self.headers = headers
@ -789,7 +791,7 @@ class HubDB:
self.total_transactions.clear() self.total_transactions.clear()
self.tx_num_mapping.clear() self.tx_num_mapping.clear()
start = time.perf_counter() start = time.perf_counter()
self.total_transactions.extend(await asyncio.get_event_loop().run_in_executor(None, _read_tx_hashes)) self.total_transactions.extend(await asyncio.get_event_loop().run_in_executor(self._executor, _read_tx_hashes))
self.tx_num_mapping = { self.tx_num_mapping = {
tx_hash: tx_num for tx_num, tx_hash in enumerate(self.total_transactions) tx_hash: tx_num for tx_num, tx_hash in enumerate(self.total_transactions)
} }
@ -936,7 +938,7 @@ class HubDB:
return x return x
if disk_count: if disk_count:
return await asyncio.get_event_loop().run_in_executor(None, read_headers), disk_count return await asyncio.get_event_loop().run_in_executor(self._executor, read_headers), disk_count
return b'', 0 return b'', 0
def fs_tx_hash(self, tx_num): def fs_tx_hash(self, tx_num):
@ -1029,7 +1031,7 @@ class HubDB:
transactions. By default returns at most 1000 entries. Set transactions. By default returns at most 1000 entries. Set
limit to None to get them all. limit to None to get them all.
""" """
return await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit) return await asyncio.get_event_loop().run_in_executor(self._executor, self.read_history, hashX, limit)
# -- Undo information # -- Undo information
@ -1119,7 +1121,7 @@ class HubDB:
return utxos return utxos
while True: while True:
utxos = await asyncio.get_event_loop().run_in_executor(None, read_utxos) utxos = await asyncio.get_event_loop().run_in_executor(self._executor, read_utxos)
if all(utxo.tx_hash is not None for utxo in utxos): if all(utxo.tx_hash is not None for utxo in utxos):
return utxos return utxos
self.logger.warning(f'all_utxos: tx hash not ' self.logger.warning(f'all_utxos: tx hash not '
@ -1144,11 +1146,11 @@ class HubDB:
if utxo_value: if utxo_value:
utxo_append((hashX, utxo_value.amount)) utxo_append((hashX, utxo_value.amount))
return utxos return utxos
return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos) return await asyncio.get_event_loop().run_in_executor(self._executor, lookup_utxos)
async def get_trending_notifications(self, height: int): async def get_trending_notifications(self, height: int):
def read_trending(): def read_trending():
return { return {
k.claim_hash: v for k, v in self.prefix_db.trending_notification.iterate((height,)) k.claim_hash: v for k, v in self.prefix_db.trending_notification.iterate((height,))
} }
return await asyncio.get_event_loop().run_in_executor(None, read_trending) return await asyncio.get_event_loop().run_in_executor(self._executor, read_trending)