flush in advance/rollback methods

This commit is contained in:
Jack Robison 2022-01-31 15:36:15 -05:00
parent 28f25538a3
commit a4880c1cf0
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -252,9 +252,10 @@ class BlockProcessor:
total_start = time.perf_counter() total_start = time.perf_counter()
try: try:
for block in blocks: for block in blocks:
if self._stopping:
return
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread(self.advance_block, block) await self.run_in_thread_with_lock(self.advance_block, block)
await self.flush()
self.logger.info("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start) self.logger.info("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
if self.height == self.coin.nExtendedClaimExpirationForkHeight: if self.height == self.coin.nExtendedClaimExpirationForkHeight:
@ -289,7 +290,7 @@ class BlockProcessor:
try: try:
assert count > 0, count assert count > 0, count
for _ in range(count): for _ in range(count):
await self.backup_block() await self.run_in_thread_with_lock(self.backup_block)
self.logger.info(f'backed up to height {self.height:,d}') self.logger.info(f'backed up to height {self.height:,d}')
if self.env.cache_all_claim_txos: if self.env.cache_all_claim_txos:
@ -311,19 +312,6 @@ class BlockProcessor:
'resetting the prefetcher') 'resetting the prefetcher')
await self.prefetcher.reset_height(self.height) await self.prefetcher.reset_height(self.height)
async def flush(self):
save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit
def flush():
self.db.write_db_state()
if save_undo:
self.db.prefix_db.commit(self.height, self.tip)
else:
self.db.prefix_db.unsafe_commit()
self.clear_after_advance_or_reorg()
self.db.assert_db_state()
await self.run_in_thread_with_lock(flush)
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]): spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]):
try: try:
@ -1415,7 +1403,6 @@ class BlockProcessor:
self.db.headers.append(block.header) self.db.headers.append(block.header)
self.tip = self.coin.header_hash(block.header) self.tip = self.coin.header_hash(block.header)
self.db.fs_height = self.height self.db.fs_height = self.height
self.db.fs_tx_count = self.tx_count self.db.fs_tx_count = self.tx_count
self.db.hist_flush_count += 1 self.db.hist_flush_count += 1
@ -1430,6 +1417,17 @@ class BlockProcessor:
self.db.last_flush = now self.db.last_flush = now
self.db.write_db_state() self.db.write_db_state()
# flush the changes
save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit
self.db.write_db_state()
if save_undo:
self.db.prefix_db.commit(self.height, self.tip)
else:
self.db.prefix_db.unsafe_commit()
self.clear_after_advance_or_reorg()
self.db.assert_db_state()
def clear_after_advance_or_reorg(self): def clear_after_advance_or_reorg(self):
self.txo_to_claim.clear() self.txo_to_claim.clear()
self.claim_hash_to_txo.clear() self.claim_hash_to_txo.clear()
@ -1463,7 +1461,7 @@ class BlockProcessor:
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.touched_hashXs.clear() self.touched_hashXs.clear()
async def backup_block(self): def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0
touched_and_deleted = self.db.prefix_db.touched_or_deleted.get(self.height) touched_and_deleted = self.db.prefix_db.touched_or_deleted.get(self.height)
self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims) self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims)
@ -1514,13 +1512,12 @@ class BlockProcessor:
self.db.last_flush = now self.db.last_flush = now
self.db.last_flush_tx_count = self.db.fs_tx_count self.db.last_flush_tx_count = self.db.fs_tx_count
def rollback(): # rollback
self.db.prefix_db.rollback(self.height + 1, reverted_block_hash) self.db.prefix_db.rollback(self.height + 1, reverted_block_hash)
self.db.es_sync_height = self.height self.db.es_sync_height = self.height
self.db.write_db_state() self.db.write_db_state()
self.db.prefix_db.unsafe_commit() self.db.prefix_db.unsafe_commit()
await self.run_in_thread_with_lock(rollback)
self.clear_after_advance_or_reorg() self.clear_after_advance_or_reorg()
self.db.assert_db_state() self.db.assert_db_state()