forked from LBRYCommunity/lbry-sdk
split flush from advance_block
This commit is contained in:
parent
bfbe7c1bf5
commit
a48fe84971
1 changed files with 10 additions and 11 deletions
|
@ -302,7 +302,8 @@ class BlockProcessor:
|
|||
try:
|
||||
for block in blocks:
|
||||
start = time.perf_counter()
|
||||
await self.run_in_thread_with_lock(self.advance_block, block)
|
||||
await self.advance_block(block)
|
||||
await self.flush()
|
||||
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
||||
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||
if not self.db.first_sync:
|
||||
|
@ -340,7 +341,10 @@ class BlockProcessor:
|
|||
try:
|
||||
assert count > 0, count
|
||||
for _ in range(count):
|
||||
await self.run_in_thread_with_lock(self.backup_block)
|
||||
await self.backup_block()
|
||||
await self.flush()
|
||||
self.logger.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
for touched in self.touched_claims_to_send_es:
|
||||
if not self.db.get_claim_txo(touched):
|
||||
self.removed_claims_to_send_es.add(touched)
|
||||
|
@ -375,6 +379,7 @@ class BlockProcessor:
|
|||
async def flush(self):
|
||||
def flush():
|
||||
self.db.flush_dbs(self.flush_data())
|
||||
self.clear_after_advance_or_reorg()
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
async def write_state(self):
|
||||
|
@ -1124,7 +1129,7 @@ class BlockProcessor:
|
|||
self.touched_claims_to_send_es.update(self.touched_claim_hashes)
|
||||
self.removed_claims_to_send_es.update(self.removed_claim_hashes)
|
||||
|
||||
def advance_block(self, block):
|
||||
async def advance_block(self, block):
|
||||
height = self.height + 1
|
||||
# print("advance ", height)
|
||||
# Use local vars for speed in the loops
|
||||
|
@ -1240,9 +1245,6 @@ class BlockProcessor:
|
|||
self.db.headers.append(block.header)
|
||||
self.tip = self.coin.header_hash(block.header)
|
||||
|
||||
self.db.flush_dbs(self.flush_data())
|
||||
self.clear_after_advance_or_reorg()
|
||||
|
||||
def clear_after_advance_or_reorg(self):
|
||||
self.db_op_stack.clear()
|
||||
self.txo_to_claim.clear()
|
||||
|
@ -1273,8 +1275,8 @@ class BlockProcessor:
|
|||
self.pending_reposted.clear()
|
||||
self.pending_channel_counts.clear()
|
||||
|
||||
def backup_block(self):
|
||||
self.db.assert_flushed(self.flush_data())
|
||||
async def backup_block(self):
|
||||
# self.db.assert_flushed(self.flush_data())
|
||||
self.logger.info("backup block %i", self.height)
|
||||
# Check and update self.tip
|
||||
undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height)
|
||||
|
@ -1298,9 +1300,6 @@ class BlockProcessor:
|
|||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched_hashXs.discard(None)
|
||||
self.db.flush_backup(self.flush_data())
|
||||
self.clear_after_advance_or_reorg()
|
||||
self.logger.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
|
||||
hashX = self.coin.hashX_from_script(txout.pk_script)
|
||||
|
|
Loading…
Reference in a new issue