diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index f2c0bcf9a..523386951 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -3,6 +3,7 @@ import asyncio import typing from bisect import bisect_right from struct import pack, unpack +from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple from prometheus_client import Gauge, Histogram from collections import defaultdict @@ -203,6 +204,8 @@ class BlockProcessor: self.env = env self.db = db self.daemon = daemon + self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') + self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync') self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.shutdown_event = shutdown_event self.coin = env.coin @@ -299,7 +302,11 @@ class BlockProcessor: for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() - async for claim in self.db.claims_producer(self.touched_claims_to_send_es): + + to_update = await asyncio.get_event_loop().run_in_executor( + self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es + ) + for claim in to_update: yield 'update', claim async def run_in_thread_with_lock(self, func, *args): @@ -310,13 +317,12 @@ class BlockProcessor: # consistent and not being updated elsewhere. async def run_in_thread_locked(): async with self.state_lock: - return await asyncio.get_event_loop().run_in_executor(None, func, *args) + return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) return await asyncio.shield(run_in_thread_locked()) - @staticmethod - async def run_in_thread(func, *args): + async def run_in_thread(self, func, *args): async def run_in_thread(): - return await asyncio.get_event_loop().run_in_executor(None, func, *args) + return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) return await asyncio.shield(run_in_thread()) async def check_and_advance_blocks(self, raw_blocks): @@ -1746,5 +1752,6 @@ class BlockProcessor: self.status_server.stop() # Shut down block processing self.logger.info('closing the DB for a clean shutdown...') + self._sync_reader_executor.shutdown(wait=True) + self._chain_executor.shutdown(wait=True) self.db.close() - # self.executor.shutdown(wait=True) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 16176b3b5..0d02f9cbb 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -696,45 +696,36 @@ class LevelDB: yield meta batch.clear() - async def claims_producer(self, claim_hashes: Set[bytes]): - loop = asyncio.get_event_loop() + def claims_producer(self, claim_hashes: Set[bytes]): + batch = [] + results = [] - def produce_claims(claims): - batch = [] - _results = [] + for claim_hash in claim_hashes: + if claim_hash not in self.claim_to_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + name = self.claim_to_txo[claim_hash].normalized_name + if not self.prefix_db.claim_takeover.get(name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + claim_txo = self.claim_to_txo.get(claim_hash) + if not claim_txo: + continue + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: + batch.append(claim) - for claim_hash in claims: - if claim_hash not in self.claim_to_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - name = self.claim_to_txo[claim_hash].normalized_name - if not self.prefix_db.claim_takeover.get(name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - claim_txo = self.claim_to_txo.get(claim_hash) - if not claim_txo: - continue - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if claim: - batch.append(claim) + batch.sort(key=lambda x: x.tx_hash) - batch.sort(key=lambda x: x.tx_hash) - - for claim in batch: - _meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if _meta: - _results.append(_meta) - return _results - - if claim_hashes: - results = await loop.run_in_executor(None, produce_claims, claim_hashes) - - for meta in results: - yield meta + for claim in batch: + _meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if _meta: + results.append(_meta) + return results def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list) diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index 2a0a2111e..966d5c31e 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -69,7 +69,7 @@ class Server: def run(self): loop = asyncio.get_event_loop() - executor = ThreadPoolExecutor(self.env.max_query_workers) + executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker') loop.set_default_executor(executor) def __exit(): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index f49ea5225..dfd81f761 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -633,7 +633,7 @@ class SessionManager: self.mempool_statuses.pop(hashX, None) await asyncio.get_event_loop().run_in_executor( - None, touched.intersection_update, self.hashx_subscriptions_by_session.keys() + self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys() ) if touched or new_touched or (height_changed and self.mempool_statuses): @@ -775,10 +775,9 @@ class LBRYSessionManager(SessionManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.query_executor = None self.websocket = None # self.metrics = ServerLoadData() - self.metrics_loop = None + # self.metrics_loop = None self.running = False if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) @@ -795,12 +794,6 @@ class LBRYSessionManager(SessionManager): async def start_other(self): self.running = True - if self.env.max_query_workers is not None and self.env.max_query_workers == 0: - self.query_executor = ThreadPoolExecutor(max_workers=1) - else: - self.query_executor = ProcessPoolExecutor( - max_workers=self.env.max_query_workers or max(os.cpu_count(), 4) - ) if self.websocket is not None: await self.websocket.start() @@ -808,7 +801,6 @@ class LBRYSessionManager(SessionManager): self.running = False if self.websocket is not None: await self.websocket.stop() - self.query_executor.shutdown() class LBRYElectrumX(SessionBase): @@ -971,24 +963,6 @@ class LBRYElectrumX(SessionBase): # else: # return APICallMetrics(query_name) - async def run_in_executor(self, query_name, func, kwargs): - start = time.perf_counter() - try: - self.session_mgr.pending_query_metric.inc() - result = await asyncio.get_running_loop().run_in_executor( - self.session_mgr.query_executor, func, kwargs - ) - except asyncio.CancelledError: - raise - except Exception: - log.exception("dear devs, please handle this exception better") - self.session_mgr.db_error_metric.inc() - raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') - else: - return base64.b64encode(result).decode() - finally: - self.session_mgr.pending_query_metric.dec() - self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) # async def run_and_cache_query(self, query_name, kwargs): # start = time.perf_counter()