executors for each reader, fix shutdown

This commit is contained in:
Jack Robison 2022-01-15 13:44:24 -05:00
parent 4f4ecd64cc
commit 8a02796b37
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 23 additions and 31 deletions

View file

@ -91,7 +91,6 @@ class BlockProcessor:
max_open_files=env.db_max_open_files max_open_files=env.db_max_open_files
) )
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
self.coin = env.coin self.coin = env.coin
if env.coin.NET == 'mainnet': if env.coin.NET == 'mainnet':
@ -181,19 +180,6 @@ class BlockProcessor:
self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {} self.pending_transactions: Dict[int, bytes] = {}
async def claim_producer(self):
if self.db.db_height <= 1:
return
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
to_update = await asyncio.get_event_loop().run_in_executor(
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
)
for claim in to_update:
yield 'update', claim
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -1655,7 +1641,6 @@ class BlockProcessor:
finally: finally:
# Shut down block processing # Shut down block processing
self.logger.info('closing the DB for a clean shutdown...') self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True) self._chain_executor.shutdown(wait=True)
self.db.close() self.db.close()

View file

@ -15,7 +15,7 @@ from lbry.prometheus import PrometheusServer
class BlockchainReader: class BlockchainReader:
def __init__(self, env, secondary_name: str): def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'):
self.env = env self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
@ -27,6 +27,8 @@ class BlockchainReader:
) )
self.last_state: typing.Optional[DBState] = None self.last_state: typing.Optional[DBState] = None
self._refresh_interval = 0.1 self._refresh_interval = 0.1
self._lock = asyncio.Lock()
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
def _detect_changes(self): def _detect_changes(self):
try: try:
@ -72,13 +74,16 @@ class BlockchainReader:
# print("reader rewound to ", self.last_state.height) # print("reader rewound to ", self.last_state.height)
async def poll_for_changes(self): async def poll_for_changes(self):
await asyncio.get_event_loop().run_in_executor(None, self._detect_changes) await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)
async def refresh_blocks_forever(self, synchronized: asyncio.Event): async def refresh_blocks_forever(self, synchronized: asyncio.Event):
self.log.warning("start refresh blocks forever") self.log.warning("start refresh blocks forever")
while True: while True:
try: try:
async with self._lock:
await self.poll_for_changes() await self.poll_for_changes()
except asyncio.CancelledError:
raise
except: except:
self.log.exception("boom") self.log.exception("boom")
raise raise
@ -102,7 +107,7 @@ class BlockchainReader:
class BlockchainReaderServer(BlockchainReader): class BlockchainReaderServer(BlockchainReader):
def __init__(self, env): def __init__(self, env):
super().__init__(env, 'lbry-reader') super().__init__(env, 'lbry-reader', thread_workers=1, thread_prefix='hub-worker')
self.history_cache = {} self.history_cache = {}
self.resolve_outputs_cache = {} self.resolve_outputs_cache = {}
self.resolve_cache = {} self.resolve_cache = {}
@ -209,6 +214,7 @@ class BlockchainReaderServer(BlockchainReader):
async def stop(self): async def stop(self):
self.status_server.stop() self.status_server.stop()
async with self._lock:
for task in reversed(self.cancellable_tasks): for task in reversed(self.cancellable_tasks):
task.cancel() task.cancel()
await asyncio.wait(self.cancellable_tasks) await asyncio.wait(self.cancellable_tasks)
@ -217,13 +223,12 @@ class BlockchainReaderServer(BlockchainReader):
if self.prometheus_server: if self.prometheus_server:
await self.prometheus_server.stop() await self.prometheus_server.stop()
self.prometheus_server = None self.prometheus_server = None
self.shutdown_event.set()
await self.daemon.close() await self.daemon.close()
self._executor.shutdown(wait=True)
self.shutdown_event.set()
def run(self): def run(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)
def __exit(): def __exit():
raise SystemExit() raise SystemExit()

View file

@ -27,7 +27,7 @@ class ElasticWriter(BlockchainReader):
VERSION = 1 VERSION = 1
def __init__(self, env): def __init__(self, env):
super().__init__(env, 'lbry-elastic-writer') super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer')
# self._refresh_interval = 0.1 # self._refresh_interval = 0.1
self._task = None self._task = None
self.index = self.env.es_index_prefix + 'claims' self.index = self.env.es_index_prefix + 'claims'
@ -72,7 +72,7 @@ class ElasticWriter(BlockchainReader):
self._last_wrote_block_hash = info.get('block_hash', None) self._last_wrote_block_hash = info.get('block_hash', None)
async def read_es_height(self): async def read_es_height(self):
await asyncio.get_event_loop().run_in_executor(None, self._read_es_height) await asyncio.get_event_loop().run_in_executor(self._executor, self._read_es_height)
def write_es_height(self, height: int, block_hash: str): def write_es_height(self, height: int, block_hash: str):
with open(self._es_info_path, 'w') as f: with open(self._es_info_path, 'w') as f:
@ -294,6 +294,7 @@ class ElasticWriter(BlockchainReader):
await _start_cancellable(self.refresh_blocks_forever) await _start_cancellable(self.refresh_blocks_forever)
async def stop(self, delete_index=False): async def stop(self, delete_index=False):
async with self._lock:
while self.cancellable_tasks: while self.cancellable_tasks:
t = self.cancellable_tasks.pop() t = self.cancellable_tasks.pop()
if not t.done(): if not t.done():
@ -301,6 +302,7 @@ class ElasticWriter(BlockchainReader):
if delete_index: if delete_index:
await self.delete_index() await self.delete_index()
await self.stop_index() await self.stop_index()
self._executor.shutdown(wait=True)
def run(self): def run(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()