executors for each reader, fix shutdown

This commit is contained in:
Jack Robison 2022-01-15 13:44:24 -05:00
parent 98f8fd0556
commit 46bcc5d725
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 23 additions and 31 deletions

View file

@ -91,7 +91,6 @@ class BlockProcessor:
max_open_files=env.db_max_open_files
)
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.shutdown_event = asyncio.Event()
self.coin = env.coin
if env.coin.NET == 'mainnet':
@ -181,19 +180,6 @@ class BlockProcessor:
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {}
async def claim_producer(self):
if self.db.db_height <= 1:
return
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
to_update = await asyncio.get_event_loop().run_in_executor(
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
)
for claim in to_update:
yield 'update', claim
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task
@ -1655,7 +1641,6 @@ class BlockProcessor:
finally:
# Shut down block processing
self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True)
self.db.close()

View file

@ -15,7 +15,7 @@ from lbry.prometheus import PrometheusServer
class BlockchainReader:
def __init__(self, env, secondary_name: str):
def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'):
self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event()
@ -27,6 +27,8 @@ class BlockchainReader:
)
self.last_state: typing.Optional[DBState] = None
self._refresh_interval = 0.1
self._lock = asyncio.Lock()
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
def _detect_changes(self):
try:
@ -72,13 +74,16 @@ class BlockchainReader:
# print("reader rewound to ", self.last_state.height)
async def poll_for_changes(self):
await asyncio.get_event_loop().run_in_executor(None, self._detect_changes)
await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)
async def refresh_blocks_forever(self, synchronized: asyncio.Event):
self.log.warning("start refresh blocks forever")
while True:
try:
await self.poll_for_changes()
async with self._lock:
await self.poll_for_changes()
except asyncio.CancelledError:
raise
except:
self.log.exception("boom")
raise
@ -102,7 +107,7 @@ class BlockchainReader:
class BlockchainReaderServer(BlockchainReader):
def __init__(self, env):
super().__init__(env, 'lbry-reader')
super().__init__(env, 'lbry-reader', thread_workers=1, thread_prefix='hub-worker')
self.history_cache = {}
self.resolve_outputs_cache = {}
self.resolve_cache = {}
@ -209,21 +214,21 @@ class BlockchainReaderServer(BlockchainReader):
async def stop(self):
self.status_server.stop()
for task in reversed(self.cancellable_tasks):
task.cancel()
await asyncio.wait(self.cancellable_tasks)
async with self._lock:
for task in reversed(self.cancellable_tasks):
task.cancel()
await asyncio.wait(self.cancellable_tasks)
self.session_manager.search_index.stop()
self.db.close()
if self.prometheus_server:
await self.prometheus_server.stop()
self.prometheus_server = None
self.shutdown_event.set()
await self.daemon.close()
self._executor.shutdown(wait=True)
self.shutdown_event.set()
def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)
def __exit():
raise SystemExit()

View file

@ -27,7 +27,7 @@ class ElasticWriter(BlockchainReader):
VERSION = 1
def __init__(self, env):
super().__init__(env, 'lbry-elastic-writer')
super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer')
# self._refresh_interval = 0.1
self._task = None
self.index = self.env.es_index_prefix + 'claims'
@ -72,7 +72,7 @@ class ElasticWriter(BlockchainReader):
self._last_wrote_block_hash = info.get('block_hash', None)
async def read_es_height(self):
await asyncio.get_event_loop().run_in_executor(None, self._read_es_height)
await asyncio.get_event_loop().run_in_executor(self._executor, self._read_es_height)
def write_es_height(self, height: int, block_hash: str):
with open(self._es_info_path, 'w') as f:
@ -294,13 +294,15 @@ class ElasticWriter(BlockchainReader):
await _start_cancellable(self.refresh_blocks_forever)
async def stop(self, delete_index=False):
while self.cancellable_tasks:
t = self.cancellable_tasks.pop()
if not t.done():
t.cancel()
async with self._lock:
while self.cancellable_tasks:
t = self.cancellable_tasks.pop()
if not t.done():
t.cancel()
if delete_index:
await self.delete_index()
await self.stop_index()
self._executor.shutdown(wait=True)
def run(self):
loop = asyncio.get_event_loop()