diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index dac7b0f85..9c6f10682 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1,6 +1,7 @@ import time import asyncio from struct import pack, unpack +from concurrent.futures.thread import ThreadPoolExecutor import lbry from lbry.schema.claim import Claim @@ -145,6 +146,7 @@ class BlockProcessor: self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.logger = class_logger(__name__, self.__class__.__name__) + self.executor = ThreadPoolExecutor(1) # Meta self.next_cache_check = 0 @@ -165,6 +167,7 @@ class BlockProcessor: self.state_lock = asyncio.Lock() self.search_cache = {} + async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -173,7 +176,7 @@ class BlockProcessor: # consistent and not being updated elsewhere. async def run_in_thread_locked(): async with self.state_lock: - return await asyncio.get_event_loop().run_in_executor(None, func, *args) + return await asyncio.get_event_loop().run_in_executor(self.executor, func, *args) return await asyncio.shield(run_in_thread_locked()) async def check_and_advance_blocks(self, raw_blocks): @@ -657,6 +660,7 @@ class BlockProcessor: self.logger.info('flushing to DB for a clean shutdown...') await self.flush(True) self.db.close() + self.executor.shutdown(wait=True) def force_chain_reorg(self, count): """Force a reorg of the given number of blocks.