forked from LBRYCommunity/lbry-sdk
threadpools for block processor and es sync reader
This commit is contained in:
parent
a4ad1bb0a9
commit
eeaf9a72e2
4 changed files with 43 additions and 71 deletions
|
@ -3,6 +3,7 @@ import asyncio
|
|||
import typing
|
||||
from bisect import bisect_right
|
||||
from struct import pack, unpack
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
|
||||
from prometheus_client import Gauge, Histogram
|
||||
from collections import defaultdict
|
||||
|
@ -203,6 +204,8 @@ class BlockProcessor:
|
|||
self.env = env
|
||||
self.db = db
|
||||
self.daemon = daemon
|
||||
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
|
||||
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
|
||||
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
|
||||
self.shutdown_event = shutdown_event
|
||||
self.coin = env.coin
|
||||
|
@ -299,7 +302,11 @@ class BlockProcessor:
|
|||
|
||||
for claim_hash in self.removed_claims_to_send_es:
|
||||
yield 'delete', claim_hash.hex()
|
||||
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
|
||||
|
||||
to_update = await asyncio.get_event_loop().run_in_executor(
|
||||
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
|
||||
)
|
||||
for claim in to_update:
|
||||
yield 'update', claim
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
|
@ -310,13 +317,12 @@ class BlockProcessor:
|
|||
# consistent and not being updated elsewhere.
|
||||
async def run_in_thread_locked():
|
||||
async with self.state_lock:
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
|
||||
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
|
||||
return await asyncio.shield(run_in_thread_locked())
|
||||
|
||||
@staticmethod
|
||||
async def run_in_thread(func, *args):
|
||||
async def run_in_thread(self, func, *args):
|
||||
async def run_in_thread():
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
|
||||
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
|
||||
return await asyncio.shield(run_in_thread())
|
||||
|
||||
async def check_and_advance_blocks(self, raw_blocks):
|
||||
|
@ -1746,5 +1752,6 @@ class BlockProcessor:
|
|||
self.status_server.stop()
|
||||
# Shut down block processing
|
||||
self.logger.info('closing the DB for a clean shutdown...')
|
||||
self._sync_reader_executor.shutdown(wait=True)
|
||||
self._chain_executor.shutdown(wait=True)
|
||||
self.db.close()
|
||||
# self.executor.shutdown(wait=True)
|
||||
|
|
|
@ -696,45 +696,36 @@ class LevelDB:
|
|||
yield meta
|
||||
batch.clear()
|
||||
|
||||
async def claims_producer(self, claim_hashes: Set[bytes]):
|
||||
loop = asyncio.get_event_loop()
|
||||
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||
batch = []
|
||||
results = []
|
||||
|
||||
def produce_claims(claims):
|
||||
batch = []
|
||||
_results = []
|
||||
for claim_hash in claim_hashes:
|
||||
if claim_hash not in self.claim_to_txo:
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
name = self.claim_to_txo[claim_hash].normalized_name
|
||||
if not self.prefix_db.claim_takeover.get(name):
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
claim_txo = self.claim_to_txo.get(claim_hash)
|
||||
if not claim_txo:
|
||||
continue
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if claim:
|
||||
batch.append(claim)
|
||||
|
||||
for claim_hash in claims:
|
||||
if claim_hash not in self.claim_to_txo:
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
name = self.claim_to_txo[claim_hash].normalized_name
|
||||
if not self.prefix_db.claim_takeover.get(name):
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
claim_txo = self.claim_to_txo.get(claim_hash)
|
||||
if not claim_txo:
|
||||
continue
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if claim:
|
||||
batch.append(claim)
|
||||
batch.sort(key=lambda x: x.tx_hash)
|
||||
|
||||
batch.sort(key=lambda x: x.tx_hash)
|
||||
|
||||
for claim in batch:
|
||||
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if _meta:
|
||||
_results.append(_meta)
|
||||
return _results
|
||||
|
||||
if claim_hashes:
|
||||
results = await loop.run_in_executor(None, produce_claims, claim_hashes)
|
||||
|
||||
for meta in results:
|
||||
yield meta
|
||||
for claim in batch:
|
||||
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if _meta:
|
||||
results.append(_meta)
|
||||
return results
|
||||
|
||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||
activated = defaultdict(list)
|
||||
|
|
|
@ -69,7 +69,7 @@ class Server:
|
|||
|
||||
def run(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
executor = ThreadPoolExecutor(self.env.max_query_workers)
|
||||
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
|
||||
loop.set_default_executor(executor)
|
||||
|
||||
def __exit():
|
||||
|
|
|
@ -633,7 +633,7 @@ class SessionManager:
|
|||
self.mempool_statuses.pop(hashX, None)
|
||||
|
||||
await asyncio.get_event_loop().run_in_executor(
|
||||
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
)
|
||||
|
||||
if touched or new_touched or (height_changed and self.mempool_statuses):
|
||||
|
@ -775,10 +775,9 @@ class LBRYSessionManager(SessionManager):
|
|||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.query_executor = None
|
||||
self.websocket = None
|
||||
# self.metrics = ServerLoadData()
|
||||
self.metrics_loop = None
|
||||
# self.metrics_loop = None
|
||||
self.running = False
|
||||
if self.env.websocket_host is not None and self.env.websocket_port is not None:
|
||||
self.websocket = AdminWebSocket(self)
|
||||
|
@ -795,12 +794,6 @@ class LBRYSessionManager(SessionManager):
|
|||
|
||||
async def start_other(self):
|
||||
self.running = True
|
||||
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
|
||||
self.query_executor = ThreadPoolExecutor(max_workers=1)
|
||||
else:
|
||||
self.query_executor = ProcessPoolExecutor(
|
||||
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
|
||||
)
|
||||
if self.websocket is not None:
|
||||
await self.websocket.start()
|
||||
|
||||
|
@ -808,7 +801,6 @@ class LBRYSessionManager(SessionManager):
|
|||
self.running = False
|
||||
if self.websocket is not None:
|
||||
await self.websocket.stop()
|
||||
self.query_executor.shutdown()
|
||||
|
||||
|
||||
class LBRYElectrumX(SessionBase):
|
||||
|
@ -971,24 +963,6 @@ class LBRYElectrumX(SessionBase):
|
|||
# else:
|
||||
# return APICallMetrics(query_name)
|
||||
|
||||
async def run_in_executor(self, query_name, func, kwargs):
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
self.session_mgr.pending_query_metric.inc()
|
||||
result = await asyncio.get_running_loop().run_in_executor(
|
||||
self.session_mgr.query_executor, func, kwargs
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
log.exception("dear devs, please handle this exception better")
|
||||
self.session_mgr.db_error_metric.inc()
|
||||
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
|
||||
else:
|
||||
return base64.b64encode(result).decode()
|
||||
finally:
|
||||
self.session_mgr.pending_query_metric.dec()
|
||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
# async def run_and_cache_query(self, query_name, kwargs):
|
||||
# start = time.perf_counter()
|
||||
|
|
Loading…
Reference in a new issue