diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index bd11e3d4e..a8190e827 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -180,6 +180,9 @@ class BlockProcessor: self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} + self._stopping = False + self._ready_to_stop = asyncio.Event() + async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -197,8 +200,6 @@ class BlockProcessor: return await asyncio.shield(run_in_thread()) async def check_mempool(self): - if self.db.prefix_db.closed: - return current_mempool = { k.tx_hash: v.raw_tx for (k, v) in self.db.prefix_db.mempool_tx.iterate() } @@ -1552,30 +1553,35 @@ class BlockProcessor: async def process_blocks_and_mempool_forever(self): """Loop forever processing blocks as they arrive.""" - while True: - if self.height == self.daemon.cached_height(): - if not self._caught_up_event.is_set(): - await self._first_caught_up() - self._caught_up_event.set() - try: - await asyncio.wait_for(self.blocks_event.wait(), 0.1) - except asyncio.TimeoutError: - pass - self.blocks_event.clear() - blocks = self.prefetcher.get_prefetched_blocks() - if not blocks: + try: + while not self._stopping: + if self.height == self.daemon.cached_height(): + if not self._caught_up_event.is_set(): + await self._first_caught_up() + self._caught_up_event.set() try: - await self.check_mempool() - self.db.prefix_db.unsafe_commit() - except Exception: - self.logger.exception("error while updating mempool txs") - raise - else: - try: - await self.check_and_advance_blocks(blocks) - except Exception: - self.logger.exception("error while processing txs") - raise + await asyncio.wait_for(self.blocks_event.wait(), 0.1) + except asyncio.TimeoutError: + pass + self.blocks_event.clear() + blocks = self.prefetcher.get_prefetched_blocks() + if self._stopping: + break + if not blocks: + try: + await self.check_mempool() + self.db.prefix_db.unsafe_commit() + except Exception: + self.logger.exception("error while updating mempool txs") + raise + else: + try: + await self.check_and_advance_blocks(blocks) + except Exception: + self.logger.exception("error while processing txs") + raise + finally: + self._ready_to_stop.set() async def _es_caught_up(self): self.db.es_sync_height = self.height @@ -1645,6 +1651,7 @@ class BlockProcessor: self.db.close() async def start(self): + self._stopping = False env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() self.logger.info(f'software version: {lbry.__version__}') @@ -1662,6 +1669,8 @@ class BlockProcessor: await _start_cancellable(self.fetch_and_process_blocks) async def stop(self): + self._stopping = True + await self._ready_to_stop.wait() for task in reversed(self.cancellable_tasks): task.cancel() await asyncio.wait(self.cancellable_tasks)