rename extend_ops

This commit is contained in:
Jack Robison 2021-07-24 14:34:03 -04:00
parent 73ba381d20
commit 30b923b283
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 48 additions and 52 deletions

View file

@ -466,7 +466,7 @@ class BlockProcessor:
previous_claim = self._make_pending_claim_txo(claim_hash) previous_claim = self._make_pending_claim_txo(claim_hash)
root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position
activation = self.db.get_activation(prev_tx_num, prev_idx) activation = self.db.get_activation(prev_tx_num, prev_idx)
self.db_op_stack.extend( self.db_op_stack.extend_ops(
StagedActivation( StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, claim_name, ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, claim_name,
previous_claim.amount previous_claim.amount
@ -479,14 +479,14 @@ class BlockProcessor:
) )
self.txo_to_claim[(tx_num, nout)] = pending self.txo_to_claim[(tx_num, nout)] = pending
self.claim_hash_to_txo[claim_hash] = (tx_num, nout) self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
self.db_op_stack.extend(pending.get_add_claim_utxo_ops()) self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops())
def _add_support(self, txo: 'Output', tx_num: int, nout: int): def _add_support(self, txo: 'Output', tx_num: int, nout: int):
supported_claim_hash = txo.claim_hash[::-1] supported_claim_hash = txo.claim_hash[::-1]
self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout)) self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout))
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount
# print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}") # print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}")
self.db_op_stack.extend(StagedClaimtrieSupport( self.db_op_stack.extend_ops(StagedClaimtrieSupport(
supported_claim_hash, tx_num, nout, txo.amount supported_claim_hash, tx_num, nout, txo.amount
).get_add_support_utxo_ops()) ).get_add_support_utxo_ops())
@ -505,7 +505,7 @@ class BlockProcessor:
supported_name = self._get_pending_claim_name(spent_support) supported_name = self._get_pending_claim_name(spent_support)
# print(f"\tspent support for {spent_support.hex()}") # print(f"\tspent support for {spent_support.hex()}")
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx)) self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
self.db_op_stack.extend(StagedClaimtrieSupport( self.db_op_stack.extend_ops(StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops()) ).get_spend_support_txo_ops())
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
@ -517,11 +517,11 @@ class BlockProcessor:
if 0 < activation < self.height + 1: if 0 < activation < self.height + 1:
self.removed_active_support_amount_by_claim[spent_support].append(support_amount) self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}") # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
self.db_op_stack.extend(StagedClaimtrieSupport( self.db_op_stack.extend_ops(StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops()) ).get_spend_support_txo_ops())
if supported_name is not None and activation > 0: if supported_name is not None and activation > 0:
self.db_op_stack.extend(StagedActivation( self.db_op_stack.extend_ops(StagedActivation(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
support_amount support_amount
).get_remove_activate_ops()) ).get_remove_activate_ops())
@ -543,7 +543,7 @@ class BlockProcessor:
self.pending_channel_counts[spent.signing_hash] -= 1 self.pending_channel_counts[spent.signing_hash] -= 1
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name) spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name)
# print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") # print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
self.db_op_stack.extend(spent.get_spend_claim_txo_ops()) self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops())
return True return True
def _spend_claim_or_support_txo(self, txin, spent_claims): def _spend_claim_or_support_txo(self, txin, spent_claims):
@ -607,12 +607,12 @@ class BlockProcessor:
claim = self._make_pending_claim_txo(signed_claim_hash) claim = self._make_pending_claim_txo(signed_claim_hash)
self.signatures_changed.add(signed_claim_hash) self.signatures_changed.add(signed_claim_hash)
self.pending_channel_counts[claim_hash] -= 1 self.pending_channel_counts[claim_hash] -= 1
self.db_op_stack.extend(claim.get_invalidate_signature_ops()) self.db_op_stack.extend_ops(claim.get_invalidate_signature_ops())
for staged in list(self.txo_to_claim.values()): for staged in list(self.txo_to_claim.values()):
needs_invalidate = staged.claim_hash not in self.doesnt_have_valid_signature needs_invalidate = staged.claim_hash not in self.doesnt_have_valid_signature
if staged.signing_hash == claim_hash and needs_invalidate: if staged.signing_hash == claim_hash and needs_invalidate:
self.db_op_stack.extend(staged.get_invalidate_signature_ops()) self.db_op_stack.extend_ops(staged.get_invalidate_signature_ops())
self.txo_to_claim[self.claim_hash_to_txo[staged.claim_hash]] = staged.invalidate_signature() self.txo_to_claim[self.claim_hash_to_txo[staged.claim_hash]] = staged.invalidate_signature()
self.signatures_changed.add(staged.claim_hash) self.signatures_changed.add(staged.claim_hash)
self.pending_channel_counts[claim_hash] -= 1 self.pending_channel_counts[claim_hash] -= 1
@ -771,7 +771,7 @@ class BlockProcessor:
activation = self.db.get_activation(staged.tx_num, staged.position) activation = self.db.get_activation(staged.tx_num, staged.position)
if activation > 0: # db returns -1 for non-existent txos if activation > 0: # db returns -1 for non-existent txos
# removed queued future activation from the db # removed queued future activation from the db
self.db_op_stack.extend( self.db_op_stack.extend_ops(
StagedActivation( StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position,
activation, staged.name, staged.amount activation, staged.name, staged.amount
@ -794,7 +794,7 @@ class BlockProcessor:
# prepare to activate or delay activation of the pending claims being added this block # prepare to activate or delay activation of the pending claims being added this block
for (tx_num, nout), staged in self.txo_to_claim.items(): for (tx_num, nout), staged in self.txo_to_claim.items():
self.db_op_stack.extend(get_delayed_activate_ops( self.db_op_stack.extend_ops(get_delayed_activate_ops(
staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False
)) ))
@ -814,7 +814,7 @@ class BlockProcessor:
v = supported_claim_info v = supported_claim_info
name = v.name name = v.name
staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position) staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position)
self.db_op_stack.extend(get_delayed_activate_ops( self.db_op_stack.extend_ops(get_delayed_activate_ops(
name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True
)) ))
@ -884,7 +884,7 @@ class BlockProcessor:
if not has_candidate: if not has_candidate:
# remove name takeover entry, the name is now unclaimed # remove name takeover entry, the name is now unclaimed
controlling = get_controlling(need_takeover) controlling = get_controlling(need_takeover)
self.db_op_stack.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) self.db_op_stack.extend_ops(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
# scan for possible takeovers out of the accumulated activations, of these make sure there # scan for possible takeovers out of the accumulated activations, of these make sure there
# aren't any future activations for the taken over names with yet higher amounts, if there are # aren't any future activations for the taken over names with yet higher amounts, if there are
@ -975,35 +975,32 @@ class BlockProcessor:
break break
assert None not in (amount, activation) assert None not in (amount, activation)
# update the claim that's activating early # update the claim that's activating early
self.db_op_stack.extend( self.db_op_stack.extend_ops(
StagedActivation( StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, activation, name, amount position, activation, name, amount
).get_remove_activate_ops() ).get_remove_activate_ops() + \
)
self.db_op_stack.extend(
StagedActivation( StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, height, name, amount position, height, name, amount
).get_activate_ops() ).get_activate_ops()
) )
for (k, amount) in activate_in_future[name][winning_including_future_activations]: for (k, amount) in activate_in_future[name][winning_including_future_activations]:
txo = (k.tx_num, k.position) txo = (k.tx_num, k.position)
if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]: if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]:
t = ACTIVATED_SUPPORT_TXO_TYPE self.db_op_stack.extend_ops(
self.db_op_stack.extend(
StagedActivation( StagedActivation(
t, winning_including_future_activations, k.tx_num, ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
k.position, k.height, name, amount k.position, k.height, name, amount
).get_remove_activate_ops() ).get_remove_activate_ops() + \
)
self.db_op_stack.extend(
StagedActivation( StagedActivation(
t, winning_including_future_activations, k.tx_num, ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
k.position, height, name, amount k.position, height, name, amount
).get_activate_ops() ).get_activate_ops()
) )
self.db_op_stack.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
self.touched_claim_hashes.add(winning_including_future_activations) self.touched_claim_hashes.add(winning_including_future_activations)
if controlling and controlling.claim_hash not in self.abandoned_claims: if controlling and controlling.claim_hash not in self.abandoned_claims:
self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(controlling.claim_hash)
@ -1024,19 +1021,19 @@ class BlockProcessor:
if previous_pending_activate.height > height: if previous_pending_activate.height > height:
# the claim had a pending activation in the future, move it to now # the claim had a pending activation in the future, move it to now
if tx_num < self.tx_count: if tx_num < self.tx_count:
self.db_op_stack.extend( self.db_op_stack.extend_ops(
StagedActivation( StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, previous_pending_activate.height, name, amount position, previous_pending_activate.height, name, amount
).get_remove_activate_ops() ).get_remove_activate_ops()
) )
self.db_op_stack.extend( self.db_op_stack.extend_ops(
StagedActivation( StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, height, name, amount position, height, name, amount
).get_activate_ops() ).get_activate_ops()
) )
self.db_op_stack.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
if controlling and controlling.claim_hash not in self.abandoned_claims: if controlling and controlling.claim_hash not in self.abandoned_claims:
self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning_claim_hash) self.touched_claim_hashes.add(winning_claim_hash)
@ -1062,7 +1059,7 @@ class BlockProcessor:
winning = max(amounts, key=lambda x: amounts[x]) winning = max(amounts, key=lambda x: amounts[x])
if (controlling and winning != controlling.claim_hash) or (not controlling and winning): if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") # print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
self.db_op_stack.extend(get_takeover_name_ops(name, winning, height, controlling)) self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning, height, controlling))
if controlling: if controlling:
self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning) self.touched_claim_hashes.add(winning)
@ -1086,7 +1083,7 @@ class BlockProcessor:
removed_claim.name, removed removed_claim.name, removed
) )
if amt: if amt:
self.db_op_stack.extend(get_remove_effective_amount_ops( self.db_op_stack.extend_ops(get_remove_effective_amount_ops(
removed_claim.name, amt.effective_amount, amt.tx_num, removed_claim.name, amt.effective_amount, amt.tx_num,
amt.position, removed amt.position, removed
)) ))
@ -1098,7 +1095,7 @@ class BlockProcessor:
if claim_from_db: if claim_from_db:
claim_amount_info = self.db.get_url_effective_amount(name, touched) claim_amount_info = self.db.get_url_effective_amount(name, touched)
if claim_amount_info: if claim_amount_info:
self.db_op_stack.extend(get_remove_effective_amount_ops( self.db_op_stack.extend_ops(get_remove_effective_amount_ops(
name, claim_amount_info.effective_amount, claim_amount_info.tx_num, name, claim_amount_info.effective_amount, claim_amount_info.tx_num,
claim_amount_info.position, touched claim_amount_info.position, touched
)) ))
@ -1109,10 +1106,10 @@ class BlockProcessor:
name, tx_num, position = v.name, v.tx_num, v.position name, tx_num, position = v.name, v.tx_num, v.position
amt = self.db.get_url_effective_amount(name, touched) amt = self.db.get_url_effective_amount(name, touched)
if amt: if amt:
self.db_op_stack.extend(get_remove_effective_amount_ops( self.db_op_stack.extend_ops(get_remove_effective_amount_ops(
name, amt.effective_amount, amt.tx_num, amt.position, touched name, amt.effective_amount, amt.tx_num, amt.position, touched
)) ))
self.db_op_stack.extend( self.db_op_stack.extend_ops(
get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched), get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
tx_num, position, touched) tx_num, position, touched)
) )
@ -1130,24 +1127,24 @@ class BlockProcessor:
def advance_block(self, block): def advance_block(self, block):
height = self.height + 1 height = self.height + 1
# print("advance ", height) # print("advance ", height)
txs: List[Tuple[Tx, bytes]] = block.transactions
block_hash = self.coin.header_hash(block.header)
self.db_op_stack.append(RevertablePut(*Prefixes.block_hash.pack_item(height, block_hash)))
tx_count = self.tx_count
# Use local vars for speed in the loops # Use local vars for speed in the loops
tx_count = self.tx_count
spend_utxo = self.spend_utxo spend_utxo = self.spend_utxo
add_utxo = self.add_utxo add_utxo = self.add_utxo
spend_claim_or_support_txo = self._spend_claim_or_support_txo spend_claim_or_support_txo = self._spend_claim_or_support_txo
add_claim_or_support = self._add_claim_or_support add_claim_or_support = self._add_claim_or_support
txs: List[Tuple[Tx, bytes]] = block.transactions
self.db_op_stack.extend_ops([
RevertablePut(*Prefixes.block_hash.pack_item(height, self.coin.header_hash(block.header))),
RevertablePut(*Prefixes.header.pack_item(height, block.header))
])
for tx, tx_hash in txs: for tx, tx_hash in txs:
spent_claims = {} spent_claims = {}
txos = Transaction(tx.raw).outputs txos = Transaction(tx.raw).outputs
self.db_op_stack.extend([ self.db_op_stack.extend_ops([
RevertablePut(*Prefixes.tx.pack_item(tx_hash, tx.raw)), RevertablePut(*Prefixes.tx.pack_item(tx_hash, tx.raw)),
RevertablePut(*Prefixes.tx_num.pack_item(tx_hash, tx_count)), RevertablePut(*Prefixes.tx_num.pack_item(tx_hash, tx_count)),
RevertablePut(*Prefixes.tx_hash.pack_item(tx_count, tx_hash)) RevertablePut(*Prefixes.tx_hash.pack_item(tx_count, tx_hash))
@ -1208,13 +1205,12 @@ class BlockProcessor:
# update effective amount and update sets of touched and deleted claims # update effective amount and update sets of touched and deleted claims
self._get_cumulative_update_ops() self._get_cumulative_update_ops()
self.db_op_stack.append(RevertablePut(*Prefixes.header.pack_item(height, block.header))) self.db_op_stack.append_op(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
self.db_op_stack.append(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
for hashX, new_history in self.hashXs_by_tx.items(): for hashX, new_history in self.hashXs_by_tx.items():
if not new_history: if not new_history:
continue continue
self.db_op_stack.append( self.db_op_stack.append_op(
RevertablePut( RevertablePut(
*Prefixes.hashX_history.pack_item( *Prefixes.hashX_history.pack_item(
hashX, height, new_history hashX, height, new_history
@ -1227,14 +1223,14 @@ class BlockProcessor:
cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit
if height >= cached_max_reorg_depth: if height >= cached_max_reorg_depth:
self.db_op_stack.append( self.db_op_stack.append_op(
RevertablePut( RevertablePut(
*Prefixes.touched_or_deleted.pack_item( *Prefixes.touched_or_deleted.pack_item(
height, self.touched_claim_hashes, self.removed_claim_hashes height, self.touched_claim_hashes, self.removed_claim_hashes
) )
) )
) )
self.db_op_stack.append( self.db_op_stack.append_op(
RevertablePut( RevertablePut(
*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops()) *Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())
) )
@ -1284,7 +1280,7 @@ class BlockProcessor:
undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height) undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height)
if undo_ops is None: if undo_ops is None:
raise ChainError(f'no undo information found for height {self.height:,d}') raise ChainError(f'no undo information found for height {self.height:,d}')
self.db_op_stack.append(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops)) self.db_op_stack.append_op(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops))
self.db_op_stack.apply_packed_undo_ops(undo_ops) self.db_op_stack.apply_packed_undo_ops(undo_ops)
touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes) touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes)
@ -1311,7 +1307,7 @@ class BlockProcessor:
if hashX: if hashX:
self.touched_hashXs.add(hashX) self.touched_hashXs.add(hashX)
self.utxo_cache[(tx_hash, nout)] = hashX self.utxo_cache[(tx_hash, nout)] = hashX
self.db_op_stack.extend([ self.db_op_stack.extend_ops([
RevertablePut( RevertablePut(
*Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value) *Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value)
), ),

View file

@ -84,7 +84,7 @@ class RevertableOpStack:
self._get = get_fn self._get = get_fn
self._items = defaultdict(list) self._items = defaultdict(list)
def append(self, op: RevertableOp): def append_op(self, op: RevertableOp):
inverted = op.invert() inverted = op.invert()
if self._items[op.key] and inverted == self._items[op.key][-1]: if self._items[op.key] and inverted == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
@ -109,9 +109,9 @@ class RevertableOpStack:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
self._items[op.key].append(op) self._items[op.key].append(op)
def extend(self, ops: Iterable[RevertableOp]): def extend_ops(self, ops: Iterable[RevertableOp]):
for op in ops: for op in ops:
self.append(op) self.append_op(op)
def clear(self): def clear(self):
self._items.clear() self._items.clear()
@ -135,4 +135,4 @@ class RevertableOpStack:
def apply_packed_undo_ops(self, packed: bytes): def apply_packed_undo_ops(self, packed: bytes):
while packed: while packed:
op, packed = RevertableOp.unpack(packed) op, packed = RevertableOp.unpack(packed)
self.append(op) self.append_op(op)