wait for writing to finish before closing the db

This commit is contained in:
Jack Robison 2022-01-19 10:10:03 -05:00
parent 81458b75e4
commit b1441d4247
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -180,6 +180,9 @@ class BlockProcessor:
self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {} self.pending_transactions: Dict[int, bytes] = {}
self._stopping = False
self._ready_to_stop = asyncio.Event()
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -197,8 +200,6 @@ class BlockProcessor:
return await asyncio.shield(run_in_thread()) return await asyncio.shield(run_in_thread())
async def check_mempool(self): async def check_mempool(self):
if self.db.prefix_db.closed:
return
current_mempool = { current_mempool = {
k.tx_hash: v.raw_tx for (k, v) in self.db.prefix_db.mempool_tx.iterate() k.tx_hash: v.raw_tx for (k, v) in self.db.prefix_db.mempool_tx.iterate()
} }
@ -1552,30 +1553,35 @@ class BlockProcessor:
async def process_blocks_and_mempool_forever(self): async def process_blocks_and_mempool_forever(self):
"""Loop forever processing blocks as they arrive.""" """Loop forever processing blocks as they arrive."""
while True: try:
if self.height == self.daemon.cached_height(): while not self._stopping:
if not self._caught_up_event.is_set(): if self.height == self.daemon.cached_height():
await self._first_caught_up() if not self._caught_up_event.is_set():
self._caught_up_event.set() await self._first_caught_up()
try: self._caught_up_event.set()
await asyncio.wait_for(self.blocks_event.wait(), 0.1)
except asyncio.TimeoutError:
pass
self.blocks_event.clear()
blocks = self.prefetcher.get_prefetched_blocks()
if not blocks:
try: try:
await self.check_mempool() await asyncio.wait_for(self.blocks_event.wait(), 0.1)
self.db.prefix_db.unsafe_commit() except asyncio.TimeoutError:
except Exception: pass
self.logger.exception("error while updating mempool txs") self.blocks_event.clear()
raise blocks = self.prefetcher.get_prefetched_blocks()
else: if self._stopping:
try: break
await self.check_and_advance_blocks(blocks) if not blocks:
except Exception: try:
self.logger.exception("error while processing txs") await self.check_mempool()
raise self.db.prefix_db.unsafe_commit()
except Exception:
self.logger.exception("error while updating mempool txs")
raise
else:
try:
await self.check_and_advance_blocks(blocks)
except Exception:
self.logger.exception("error while processing txs")
raise
finally:
self._ready_to_stop.set()
async def _es_caught_up(self): async def _es_caught_up(self):
self.db.es_sync_height = self.height self.db.es_sync_height = self.height
@ -1645,6 +1651,7 @@ class BlockProcessor:
self.db.close() self.db.close()
async def start(self): async def start(self):
self._stopping = False
env = self.env env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.logger.info(f'software version: {lbry.__version__}') self.logger.info(f'software version: {lbry.__version__}')
@ -1662,6 +1669,8 @@ class BlockProcessor:
await _start_cancellable(self.fetch_and_process_blocks) await _start_cancellable(self.fetch_and_process_blocks)
async def stop(self): async def stop(self):
self._stopping = True
await self._ready_to_stop.wait()
for task in reversed(self.cancellable_tasks): for task in reversed(self.cancellable_tasks):
task.cancel() task.cancel()
await asyncio.wait(self.cancellable_tasks) await asyncio.wait(self.cancellable_tasks)