block processor threadpool

This commit is contained in:
Jack Robison 2020-02-25 14:15:40 -05:00
parent 506582aa2b
commit 31f22122e8
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -1,6 +1,7 @@
import time import time
import asyncio import asyncio
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
import lbry import lbry
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
@ -145,6 +146,7 @@ class BlockProcessor:
self.blocks_event = asyncio.Event() self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.executor = ThreadPoolExecutor(1)
# Meta # Meta
self.next_cache_check = 0 self.next_cache_check = 0
@ -165,6 +167,7 @@ class BlockProcessor:
self.state_lock = asyncio.Lock() self.state_lock = asyncio.Lock()
self.search_cache = {} self.search_cache = {}
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -173,7 +176,7 @@ class BlockProcessor:
# consistent and not being updated elsewhere. # consistent and not being updated elsewhere.
async def run_in_thread_locked(): async def run_in_thread_locked():
async with self.state_lock: async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(None, func, *args) return await asyncio.get_event_loop().run_in_executor(self.executor, func, *args)
return await asyncio.shield(run_in_thread_locked()) return await asyncio.shield(run_in_thread_locked())
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
@ -657,6 +660,7 @@ class BlockProcessor:
self.logger.info('flushing to DB for a clean shutdown...') self.logger.info('flushing to DB for a clean shutdown...')
await self.flush(True) await self.flush(True)
self.db.close() self.db.close()
self.executor.shutdown(wait=True)
def force_chain_reorg(self, count): def force_chain_reorg(self, count):
"""Force a reorg of the given number of blocks. """Force a reorg of the given number of blocks.