diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 929f772cd..01f245e2e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -302,7 +302,8 @@ class BlockProcessor: try: for block in blocks: start = time.perf_counter() - await self.run_in_thread_with_lock(self.advance_block, block) + await self.advance_block(block) + await self.flush() self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) # TODO: we shouldnt wait on the search index updating before advancing to the next block if not self.db.first_sync: @@ -340,7 +341,10 @@ class BlockProcessor: try: assert count > 0, count for _ in range(count): - await self.run_in_thread_with_lock(self.backup_block) + await self.backup_block() + await self.flush() + self.logger.info(f'backed up to height {self.height:,d}') + for touched in self.touched_claims_to_send_es: if not self.db.get_claim_txo(touched): self.removed_claims_to_send_es.add(touched) @@ -375,6 +379,7 @@ class BlockProcessor: async def flush(self): def flush(): self.db.flush_dbs(self.flush_data()) + self.clear_after_advance_or_reorg() await self.run_in_thread_with_lock(flush) async def write_state(self): @@ -1124,7 +1129,7 @@ class BlockProcessor: self.touched_claims_to_send_es.update(self.touched_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes) - def advance_block(self, block): + async def advance_block(self, block): height = self.height + 1 # print("advance ", height) # Use local vars for speed in the loops @@ -1240,9 +1245,6 @@ class BlockProcessor: self.db.headers.append(block.header) self.tip = self.coin.header_hash(block.header) - self.db.flush_dbs(self.flush_data()) - self.clear_after_advance_or_reorg() - def clear_after_advance_or_reorg(self): self.db_op_stack.clear() self.txo_to_claim.clear() @@ -1273,8 +1275,8 @@ class BlockProcessor: self.pending_reposted.clear() self.pending_channel_counts.clear() - def backup_block(self): - self.db.assert_flushed(self.flush_data()) + async def backup_block(self): + # self.db.assert_flushed(self.flush_data()) self.logger.info("backup block %i", self.height) # Check and update self.tip undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height) @@ -1298,9 +1300,6 @@ class BlockProcessor: # self.touched can include other addresses which is # harmless, but remove None. self.touched_hashXs.discard(None) - self.db.flush_backup(self.flush_data()) - self.clear_after_advance_or_reorg() - self.logger.info(f'backed up to height {self.height:,d}') def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]: hashX = self.coin.hashX_from_script(txout.pk_script)