diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 97427c0d3..929f772cd 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -466,7 +466,7 @@ class BlockProcessor: previous_claim = self._make_pending_claim_txo(claim_hash) root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position activation = self.db.get_activation(prev_tx_num, prev_idx) - self.db_op_stack.extend( + self.db_op_stack.extend_ops( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, claim_name, previous_claim.amount @@ -479,14 +479,14 @@ class BlockProcessor: ) self.txo_to_claim[(tx_num, nout)] = pending self.claim_hash_to_txo[claim_hash] = (tx_num, nout) - self.db_op_stack.extend(pending.get_add_claim_utxo_ops()) + self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops()) def _add_support(self, txo: 'Output', tx_num: int, nout: int): supported_claim_hash = txo.claim_hash[::-1] self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout)) self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount # print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}") - self.db_op_stack.extend(StagedClaimtrieSupport( + self.db_op_stack.extend_ops(StagedClaimtrieSupport( supported_claim_hash, tx_num, nout, txo.amount ).get_add_support_utxo_ops()) @@ -505,7 +505,7 @@ class BlockProcessor: supported_name = self._get_pending_claim_name(spent_support) # print(f"\tspent support for {spent_support.hex()}") self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx)) - self.db_op_stack.extend(StagedClaimtrieSupport( + self.db_op_stack.extend_ops(StagedClaimtrieSupport( spent_support, txin_num, txin.prev_idx, support_amount ).get_spend_support_txo_ops()) spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) @@ -517,11 +517,11 @@ class BlockProcessor: if 0 < activation < self.height + 1: self.removed_active_support_amount_by_claim[spent_support].append(support_amount) # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}") - self.db_op_stack.extend(StagedClaimtrieSupport( + self.db_op_stack.extend_ops(StagedClaimtrieSupport( spent_support, txin_num, txin.prev_idx, support_amount ).get_spend_support_txo_ops()) if supported_name is not None and activation > 0: - self.db_op_stack.extend(StagedActivation( + self.db_op_stack.extend_ops(StagedActivation( ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, support_amount ).get_remove_activate_ops()) @@ -543,7 +543,7 @@ class BlockProcessor: self.pending_channel_counts[spent.signing_hash] -= 1 spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name) # print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") - self.db_op_stack.extend(spent.get_spend_claim_txo_ops()) + self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops()) return True def _spend_claim_or_support_txo(self, txin, spent_claims): @@ -607,12 +607,12 @@ class BlockProcessor: claim = self._make_pending_claim_txo(signed_claim_hash) self.signatures_changed.add(signed_claim_hash) self.pending_channel_counts[claim_hash] -= 1 - self.db_op_stack.extend(claim.get_invalidate_signature_ops()) + self.db_op_stack.extend_ops(claim.get_invalidate_signature_ops()) for staged in list(self.txo_to_claim.values()): needs_invalidate = staged.claim_hash not in self.doesnt_have_valid_signature if staged.signing_hash == claim_hash and needs_invalidate: - self.db_op_stack.extend(staged.get_invalidate_signature_ops()) + self.db_op_stack.extend_ops(staged.get_invalidate_signature_ops()) self.txo_to_claim[self.claim_hash_to_txo[staged.claim_hash]] = staged.invalidate_signature() self.signatures_changed.add(staged.claim_hash) self.pending_channel_counts[claim_hash] -= 1 @@ -771,7 +771,7 @@ class BlockProcessor: activation = self.db.get_activation(staged.tx_num, staged.position) if activation > 0: # db returns -1 for non-existent txos # removed queued future activation from the db - self.db_op_stack.extend( + self.db_op_stack.extend_ops( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, activation, staged.name, staged.amount @@ -794,7 +794,7 @@ class BlockProcessor: # prepare to activate or delay activation of the pending claims being added this block for (tx_num, nout), staged in self.txo_to_claim.items(): - self.db_op_stack.extend(get_delayed_activate_ops( + self.db_op_stack.extend_ops(get_delayed_activate_ops( staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False )) @@ -814,7 +814,7 @@ class BlockProcessor: v = supported_claim_info name = v.name staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position) - self.db_op_stack.extend(get_delayed_activate_ops( + self.db_op_stack.extend_ops(get_delayed_activate_ops( name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True )) @@ -884,7 +884,7 @@ class BlockProcessor: if not has_candidate: # remove name takeover entry, the name is now unclaimed controlling = get_controlling(need_takeover) - self.db_op_stack.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) + self.db_op_stack.extend_ops(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) # scan for possible takeovers out of the accumulated activations, of these make sure there # aren't any future activations for the taken over names with yet higher amounts, if there are @@ -975,35 +975,32 @@ class BlockProcessor: break assert None not in (amount, activation) # update the claim that's activating early - self.db_op_stack.extend( + self.db_op_stack.extend_ops( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, position, activation, name, amount - ).get_remove_activate_ops() - ) - self.db_op_stack.extend( + ).get_remove_activate_ops() + \ StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, position, height, name, amount ).get_activate_ops() ) + for (k, amount) in activate_in_future[name][winning_including_future_activations]: txo = (k.tx_num, k.position) if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]: - t = ACTIVATED_SUPPORT_TXO_TYPE - self.db_op_stack.extend( + self.db_op_stack.extend_ops( StagedActivation( - t, winning_including_future_activations, k.tx_num, + ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, k.position, k.height, name, amount - ).get_remove_activate_ops() - ) - self.db_op_stack.extend( + ).get_remove_activate_ops() + \ StagedActivation( - t, winning_including_future_activations, k.tx_num, + ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, k.position, height, name, amount ).get_activate_ops() ) - self.db_op_stack.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) + + self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) self.touched_claim_hashes.add(winning_including_future_activations) if controlling and controlling.claim_hash not in self.abandoned_claims: self.touched_claim_hashes.add(controlling.claim_hash) @@ -1024,19 +1021,19 @@ class BlockProcessor: if previous_pending_activate.height > height: # the claim had a pending activation in the future, move it to now if tx_num < self.tx_count: - self.db_op_stack.extend( + self.db_op_stack.extend_ops( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, position, previous_pending_activate.height, name, amount ).get_remove_activate_ops() ) - self.db_op_stack.extend( + self.db_op_stack.extend_ops( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, position, height, name, amount ).get_activate_ops() ) - self.db_op_stack.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) + self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) if controlling and controlling.claim_hash not in self.abandoned_claims: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning_claim_hash) @@ -1062,7 +1059,7 @@ class BlockProcessor: winning = max(amounts, key=lambda x: amounts[x]) if (controlling and winning != controlling.claim_hash) or (not controlling and winning): # print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") - self.db_op_stack.extend(get_takeover_name_ops(name, winning, height, controlling)) + self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning, height, controlling)) if controlling: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning) @@ -1086,7 +1083,7 @@ class BlockProcessor: removed_claim.name, removed ) if amt: - self.db_op_stack.extend(get_remove_effective_amount_ops( + self.db_op_stack.extend_ops(get_remove_effective_amount_ops( removed_claim.name, amt.effective_amount, amt.tx_num, amt.position, removed )) @@ -1098,7 +1095,7 @@ class BlockProcessor: if claim_from_db: claim_amount_info = self.db.get_url_effective_amount(name, touched) if claim_amount_info: - self.db_op_stack.extend(get_remove_effective_amount_ops( + self.db_op_stack.extend_ops(get_remove_effective_amount_ops( name, claim_amount_info.effective_amount, claim_amount_info.tx_num, claim_amount_info.position, touched )) @@ -1109,10 +1106,10 @@ class BlockProcessor: name, tx_num, position = v.name, v.tx_num, v.position amt = self.db.get_url_effective_amount(name, touched) if amt: - self.db_op_stack.extend(get_remove_effective_amount_ops( + self.db_op_stack.extend_ops(get_remove_effective_amount_ops( name, amt.effective_amount, amt.tx_num, amt.position, touched )) - self.db_op_stack.extend( + self.db_op_stack.extend_ops( get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched), tx_num, position, touched) ) @@ -1130,24 +1127,24 @@ class BlockProcessor: def advance_block(self, block): height = self.height + 1 # print("advance ", height) - txs: List[Tuple[Tx, bytes]] = block.transactions - block_hash = self.coin.header_hash(block.header) - - self.db_op_stack.append(RevertablePut(*Prefixes.block_hash.pack_item(height, block_hash))) - - tx_count = self.tx_count - # Use local vars for speed in the loops + tx_count = self.tx_count spend_utxo = self.spend_utxo add_utxo = self.add_utxo spend_claim_or_support_txo = self._spend_claim_or_support_txo add_claim_or_support = self._add_claim_or_support + txs: List[Tuple[Tx, bytes]] = block.transactions + + self.db_op_stack.extend_ops([ + RevertablePut(*Prefixes.block_hash.pack_item(height, self.coin.header_hash(block.header))), + RevertablePut(*Prefixes.header.pack_item(height, block.header)) + ]) for tx, tx_hash in txs: spent_claims = {} txos = Transaction(tx.raw).outputs - self.db_op_stack.extend([ + self.db_op_stack.extend_ops([ RevertablePut(*Prefixes.tx.pack_item(tx_hash, tx.raw)), RevertablePut(*Prefixes.tx_num.pack_item(tx_hash, tx_count)), RevertablePut(*Prefixes.tx_hash.pack_item(tx_count, tx_hash)) @@ -1208,13 +1205,12 @@ class BlockProcessor: # update effective amount and update sets of touched and deleted claims self._get_cumulative_update_ops() - self.db_op_stack.append(RevertablePut(*Prefixes.header.pack_item(height, block.header))) - self.db_op_stack.append(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count))) + self.db_op_stack.append_op(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count))) for hashX, new_history in self.hashXs_by_tx.items(): if not new_history: continue - self.db_op_stack.append( + self.db_op_stack.append_op( RevertablePut( *Prefixes.hashX_history.pack_item( hashX, height, new_history @@ -1227,14 +1223,14 @@ class BlockProcessor: cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit if height >= cached_max_reorg_depth: - self.db_op_stack.append( + self.db_op_stack.append_op( RevertablePut( *Prefixes.touched_or_deleted.pack_item( height, self.touched_claim_hashes, self.removed_claim_hashes ) ) ) - self.db_op_stack.append( + self.db_op_stack.append_op( RevertablePut( *Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops()) ) @@ -1284,7 +1280,7 @@ class BlockProcessor: undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height) if undo_ops is None: raise ChainError(f'no undo information found for height {self.height:,d}') - self.db_op_stack.append(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops)) + self.db_op_stack.append_op(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops)) self.db_op_stack.apply_packed_undo_ops(undo_ops) touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes) @@ -1311,7 +1307,7 @@ class BlockProcessor: if hashX: self.touched_hashXs.add(hashX) self.utxo_cache[(tx_hash, nout)] = hashX - self.db_op_stack.extend([ + self.db_op_stack.extend_ops([ RevertablePut( *Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value) ), diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 604c7c60f..7365b8dbe 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -84,7 +84,7 @@ class RevertableOpStack: self._get = get_fn self._items = defaultdict(list) - def append(self, op: RevertableOp): + def append_op(self, op: RevertableOp): inverted = op.invert() if self._items[op.key] and inverted == self._items[op.key][-1]: self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both @@ -109,9 +109,9 @@ class RevertableOpStack: raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") self._items[op.key].append(op) - def extend(self, ops: Iterable[RevertableOp]): + def extend_ops(self, ops: Iterable[RevertableOp]): for op in ops: - self.append(op) + self.append_op(op) def clear(self): self._items.clear() @@ -135,4 +135,4 @@ class RevertableOpStack: def apply_packed_undo_ops(self, packed: bytes): while packed: op, packed = RevertableOp.unpack(packed) - self.append(op) + self.append_op(op)