split flush from advance_block

This commit is contained in:
Jack Robison 2021-07-24 14:36:49 -04:00
parent 30b923b283
commit 5a01dbf269
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -302,7 +302,8 @@ class BlockProcessor:
try: try:
for block in blocks: for block in blocks:
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_block, block) await self.advance_block(block)
await self.flush()
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
# TODO: we shouldnt wait on the search index updating before advancing to the next block # TODO: we shouldnt wait on the search index updating before advancing to the next block
if not self.db.first_sync: if not self.db.first_sync:
@ -340,7 +341,10 @@ class BlockProcessor:
try: try:
assert count > 0, count assert count > 0, count
for _ in range(count): for _ in range(count):
await self.run_in_thread_with_lock(self.backup_block) await self.backup_block()
await self.flush()
self.logger.info(f'backed up to height {self.height:,d}')
for touched in self.touched_claims_to_send_es: for touched in self.touched_claims_to_send_es:
if not self.db.get_claim_txo(touched): if not self.db.get_claim_txo(touched):
self.removed_claims_to_send_es.add(touched) self.removed_claims_to_send_es.add(touched)
@ -375,6 +379,7 @@ class BlockProcessor:
async def flush(self): async def flush(self):
def flush(): def flush():
self.db.flush_dbs(self.flush_data()) self.db.flush_dbs(self.flush_data())
self.clear_after_advance_or_reorg()
await self.run_in_thread_with_lock(flush) await self.run_in_thread_with_lock(flush)
async def write_state(self): async def write_state(self):
@ -1124,7 +1129,7 @@ class BlockProcessor:
self.touched_claims_to_send_es.update(self.touched_claim_hashes) self.touched_claims_to_send_es.update(self.touched_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes)
def advance_block(self, block): async def advance_block(self, block):
height = self.height + 1 height = self.height + 1
# print("advance ", height) # print("advance ", height)
# Use local vars for speed in the loops # Use local vars for speed in the loops
@ -1240,9 +1245,6 @@ class BlockProcessor:
self.db.headers.append(block.header) self.db.headers.append(block.header)
self.tip = self.coin.header_hash(block.header) self.tip = self.coin.header_hash(block.header)
self.db.flush_dbs(self.flush_data())
self.clear_after_advance_or_reorg()
def clear_after_advance_or_reorg(self): def clear_after_advance_or_reorg(self):
self.db_op_stack.clear() self.db_op_stack.clear()
self.txo_to_claim.clear() self.txo_to_claim.clear()
@ -1273,8 +1275,8 @@ class BlockProcessor:
self.pending_reposted.clear() self.pending_reposted.clear()
self.pending_channel_counts.clear() self.pending_channel_counts.clear()
def backup_block(self): async def backup_block(self):
self.db.assert_flushed(self.flush_data()) # self.db.assert_flushed(self.flush_data())
self.logger.info("backup block %i", self.height) self.logger.info("backup block %i", self.height)
# Check and update self.tip # Check and update self.tip
undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height) undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height)
@ -1298,9 +1300,6 @@ class BlockProcessor:
# self.touched can include other addresses which is # self.touched can include other addresses which is
# harmless, but remove None. # harmless, but remove None.
self.touched_hashXs.discard(None) self.touched_hashXs.discard(None)
self.db.flush_backup(self.flush_data())
self.clear_after_advance_or_reorg()
self.logger.info(f'backed up to height {self.height:,d}')
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]: def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
hashX = self.coin.hashX_from_script(txout.pk_script) hashX = self.coin.hashX_from_script(txout.pk_script)