threadpools for block processor and es sync reader

This commit is contained in:
Jack Robison 2021-10-14 15:40:20 -04:00
parent 22b43a2b01
commit 6416d8ce9c
4 changed files with 43 additions and 71 deletions

View file

@ -3,6 +3,7 @@ import asyncio
import typing
from bisect import bisect_right
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
from prometheus_client import Gauge, Histogram
from collections import defaultdict
@ -203,6 +204,8 @@ class BlockProcessor:
self.env = env
self.db = db
self.daemon = daemon
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event
self.coin = env.coin
@ -299,7 +302,11 @@ class BlockProcessor:
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
to_update = await asyncio.get_event_loop().run_in_executor(
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
)
for claim in to_update:
yield 'update', claim
async def run_in_thread_with_lock(self, func, *args):
@ -310,13 +317,12 @@ class BlockProcessor:
# consistent and not being updated elsewhere.
async def run_in_thread_locked():
async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread_locked())
@staticmethod
async def run_in_thread(func, *args):
async def run_in_thread(self, func, *args):
async def run_in_thread():
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread())
async def check_and_advance_blocks(self, raw_blocks):
@ -1746,5 +1752,6 @@ class BlockProcessor:
self.status_server.stop()
# Shut down block processing
self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True)
self.db.close()
# self.executor.shutdown(wait=True)

View file

@ -696,45 +696,36 @@ class LevelDB:
yield meta
batch.clear()
async def claims_producer(self, claim_hashes: Set[bytes]):
loop = asyncio.get_event_loop()
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
def produce_claims(claims):
batch = []
_results = []
for claim_hash in claim_hashes:
if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo:
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
for claim_hash in claims:
if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo:
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
_results.append(_meta)
return _results
if claim_hashes:
results = await loop.run_in_executor(None, produce_claims, claim_hashes)
for meta in results:
yield meta
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list)

View file

@ -69,7 +69,7 @@ class Server:
def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers)
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)
def __exit():

View file

@ -633,7 +633,7 @@ class SessionManager:
self.mempool_statuses.pop(hashX, None)
await asyncio.get_event_loop().run_in_executor(
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
)
if touched or new_touched or (height_changed and self.mempool_statuses):
@ -775,10 +775,9 @@ class LBRYSessionManager(SessionManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_executor = None
self.websocket = None
# self.metrics = ServerLoadData()
self.metrics_loop = None
# self.metrics_loop = None
self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
@ -795,12 +794,6 @@ class LBRYSessionManager(SessionManager):
async def start_other(self):
self.running = True
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1)
else:
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
)
if self.websocket is not None:
await self.websocket.start()
@ -808,7 +801,6 @@ class LBRYSessionManager(SessionManager):
self.running = False
if self.websocket is not None:
await self.websocket.stop()
self.query_executor.shutdown()
class LBRYElectrumX(SessionBase):
@ -971,24 +963,6 @@ class LBRYElectrumX(SessionBase):
# else:
# return APICallMetrics(query_name)
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
self.session_mgr.pending_query_metric.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
except asyncio.CancelledError:
raise
except Exception:
log.exception("dear devs, please handle this exception better")
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
return base64.b64encode(result).decode()
finally:
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
# async def run_and_cache_query(self, query_name, kwargs):
# start = time.perf_counter()