diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 6ef0c50b7..64e060c57 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -411,52 +411,6 @@ class BlockProcessor: return utxo_MB >= cache_MB * 4 // 5 return None - def advance_blocks(self, blocks): - """Synchronously advance the blocks. - - It is already verified they correctly connect onto our tip. - """ - min_height = self.db.min_undo_height(self.daemon.cached_height()) - height = self.height - # print("---------------------------------\nFLUSH\n---------------------------------") - - for block in blocks: - height += 1 - # print(f"***********************************\nADVANCE {height}\n***********************************") - undo_info, undo_claims = self.advance_block(block, height) - if height >= min_height: - self.undo_infos.append((undo_info, height)) - self.undo_claims.append((undo_claims, height)) - self.db.write_raw_block(block.raw, height) - - for touched_claim_hash, amount_changes in self.effective_amount_changes.items(): - new_effective_amount = sum(amount_changes) - assert new_effective_amount >= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}' - self.claimtrie_stash.extend( - self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount) - ) - # print("update effective amount to", touched_claim_hash.hex(), new_effective_amount) - - headers = [block.header for block in blocks] - self.height = height - self.headers.extend(headers) - self.tip = self.coin.header_hash(headers[-1]) - - self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining) - # print("+++++++++++++++++++++++++++++++++++++++++++++\nFLUSHED\n+++++++++++++++++++++++++++++++++++++++++++++") - - self.effective_amount_changes.clear() - self.pending_claims.clear() - self.pending_claim_txos.clear() - self.pending_supports.clear() - self.pending_support_txos.clear() - self.pending_abandon.clear() - - for cache in self.search_cache.values(): - cache.clear() - self.history_cache.clear() - self.notifications.notified_mempool_txs.clear() - def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout, spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]) -> List['RevertableOp']: try: @@ -695,9 +649,9 @@ class BlockProcessor: ops.extend(self._abandon(spent_claims)) return ops - def advance_block(self, block, height: int): + def advance_block(self, block): + height = self.height + 1 txs: List[Tuple[Tx, bytes]] = block.transactions - # header = self.coin.electrum_header(block.header, height) block_hash = self.coin.header_hash(block.header) self.block_hashes.append(block_hash) @@ -721,7 +675,6 @@ class BlockProcessor: # unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()} for tx, tx_hash in txs: - # print(f"{tx_hash[::-1].hex()} @ {height}") spent_claims = {} hashXs = [] # hashXs touched by spent inputs/rx outputs @@ -785,24 +738,44 @@ class BlockProcessor: _unflushed[_hashX].append(_tx_num) _count += len(_hashXs) self.db.hist_unflushed_count += _count - self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) - # for touched_claim_hash, amount_changes in self.effective_amount_changes.items(): - # new_effective_amount = sum(amount_changes) - # assert new_effective_amount >= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}' - # if touched_claim_hash not in unchanged_effective_amounts or unchanged_effective_amounts[touched_claim_hash] != new_effective_amount: - # claimtrie_stash_extend( - # self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount) - # ) - # # print("update effective amount to", touched_claim_hash.hex(), new_effective_amount) + for touched_claim_hash, amount_changes in self.effective_amount_changes.items(): + new_effective_amount = sum(amount_changes) + assert new_effective_amount >= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}' + claimtrie_stash.extend( + self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount) + ) undo_claims = b''.join(op.invert().pack() for op in claimtrie_stash) self.claimtrie_stash.extend(claimtrie_stash) # print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash))) - return undo_info, undo_claims + if height >= self.daemon.cached_height() - self.env.reorg_limit: + self.undo_infos.append((undo_info, height)) + self.undo_claims.append((undo_claims, height)) + self.db.write_raw_block(block.raw, height) + + self.height = height + self.headers.append(block.header) + self.tip = self.coin.header_hash(block.header) + + self.db.flush_dbs(self.flush_data()) + + self.effective_amount_changes.clear() + + self.pending_claims.clear() + self.pending_claim_txos.clear() + self.pending_supports.clear() + self.pending_support_txos.clear() + self.pending_abandon.clear() + self.staged_pending_abandoned.clear() + + for cache in self.search_cache.values(): + cache.clear() + self.history_cache.clear() + self.notifications.notified_mempool_txs.clear() def backup_blocks(self, raw_blocks): """Backup the raw blocks and flush. @@ -830,12 +803,12 @@ class BlockProcessor: self.height -= 1 self.db.tx_counts.pop() - # self.touched can include other addresses which is - # harmless, but remove None. - self.touched.discard(None) + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) - self.db.flush_backup(self.flush_data(), self.touched) - self.logger.info(f'backed up to height {self.height:,d}') + self.db.flush_backup(self.flush_data(), self.touched) + self.logger.info(f'backed up to height {self.height:,d}') def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present)