diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index daa858a89..837ce6d67 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -91,7 +91,6 @@ class BlockProcessor: max_open_files=env.db_max_open_files ) self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') - self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync') self.shutdown_event = asyncio.Event() self.coin = env.coin if env.coin.NET == 'mainnet': @@ -181,19 +180,6 @@ class BlockProcessor: self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} - async def claim_producer(self): - if self.db.db_height <= 1: - return - - for claim_hash in self.removed_claims_to_send_es: - yield 'delete', claim_hash.hex() - - to_update = await asyncio.get_event_loop().run_in_executor( - self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es - ) - for claim in to_update: - yield 'update', claim - async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -1655,7 +1641,6 @@ class BlockProcessor: finally: # Shut down block processing self.logger.info('closing the DB for a clean shutdown...') - self._sync_reader_executor.shutdown(wait=True) self._chain_executor.shutdown(wait=True) self.db.close() diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index 8e395824f..729357b43 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -15,7 +15,7 @@ from lbry.prometheus import PrometheusServer class BlockchainReader: - def __init__(self, env, secondary_name: str): + def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'): self.env = env self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) self.shutdown_event = asyncio.Event() @@ -27,6 +27,8 @@ class BlockchainReader: ) self.last_state: typing.Optional[DBState] = None self._refresh_interval = 0.1 + self._lock = asyncio.Lock() + self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix) def _detect_changes(self): try: @@ -72,13 +74,16 @@ class BlockchainReader: # print("reader rewound to ", self.last_state.height) async def poll_for_changes(self): - await asyncio.get_event_loop().run_in_executor(None, self._detect_changes) + await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) async def refresh_blocks_forever(self, synchronized: asyncio.Event): self.log.warning("start refresh blocks forever") while True: try: - await self.poll_for_changes() + async with self._lock: + await self.poll_for_changes() + except asyncio.CancelledError: + raise except: self.log.exception("boom") raise @@ -102,7 +107,7 @@ class BlockchainReader: class BlockchainReaderServer(BlockchainReader): def __init__(self, env): - super().__init__(env, 'lbry-reader') + super().__init__(env, 'lbry-reader', thread_workers=1, thread_prefix='hub-worker') self.history_cache = {} self.resolve_outputs_cache = {} self.resolve_cache = {} @@ -209,21 +214,21 @@ class BlockchainReaderServer(BlockchainReader): async def stop(self): self.status_server.stop() - for task in reversed(self.cancellable_tasks): - task.cancel() - await asyncio.wait(self.cancellable_tasks) + async with self._lock: + for task in reversed(self.cancellable_tasks): + task.cancel() + await asyncio.wait(self.cancellable_tasks) self.session_manager.search_index.stop() self.db.close() if self.prometheus_server: await self.prometheus_server.stop() self.prometheus_server = None - self.shutdown_event.set() await self.daemon.close() + self._executor.shutdown(wait=True) + self.shutdown_event.set() def run(self): loop = asyncio.get_event_loop() - executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker') - loop.set_default_executor(executor) def __exit(): raise SystemExit() diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 7024d4eb5..5f1076240 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -27,7 +27,7 @@ class ElasticWriter(BlockchainReader): VERSION = 1 def __init__(self, env): - super().__init__(env, 'lbry-elastic-writer') + super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer') # self._refresh_interval = 0.1 self._task = None self.index = self.env.es_index_prefix + 'claims' @@ -72,7 +72,7 @@ class ElasticWriter(BlockchainReader): self._last_wrote_block_hash = info.get('block_hash', None) async def read_es_height(self): - await asyncio.get_event_loop().run_in_executor(None, self._read_es_height) + await asyncio.get_event_loop().run_in_executor(self._executor, self._read_es_height) def write_es_height(self, height: int, block_hash: str): with open(self._es_info_path, 'w') as f: @@ -294,13 +294,15 @@ class ElasticWriter(BlockchainReader): await _start_cancellable(self.refresh_blocks_forever) async def stop(self, delete_index=False): - while self.cancellable_tasks: - t = self.cancellable_tasks.pop() - if not t.done(): - t.cancel() + async with self._lock: + while self.cancellable_tasks: + t = self.cancellable_tasks.pop() + if not t.done(): + t.cancel() if delete_index: await self.delete_index() await self.stop_index() + self._executor.shutdown(wait=True) def run(self): loop = asyncio.get_event_loop()