rename things

-fix effective amount integrity error
This commit is contained in:
Jack Robison 2021-07-01 17:23:27 -04:00
parent 2ee419ffca
commit cf66c2a1ee
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 176 additions and 170 deletions

View file

@ -207,7 +207,7 @@ class BlockProcessor:
self.db_deletes = []
# Claimtrie cache
self.claimtrie_stash = None
self.db_op_stack = None
self.undo_claims = []
# If the lock is successfully acquired, in-memory chain state
@ -223,31 +223,31 @@ class BlockProcessor:
#################################
# txo to pending claim
self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {}
self.txo_to_claim: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {}
# claim hash to pending claim txo
self.pending_claim_txos: typing.Dict[bytes, Tuple[int, int]] = {}
self.claim_hash_to_txo: typing.Dict[bytes, Tuple[int, int]] = {}
# claim hash to lists of pending support txos
self.pending_supports: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
self.support_txos_by_claim: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
# support txo: (supported claim hash, support amount)
self.pending_support_txos: Dict[Tuple[int, int], Tuple[bytes, int]] = {}
self.support_txo_to_claim: Dict[Tuple[int, int], Tuple[bytes, int]] = {}
# removed supports {name: {claim_hash: [(tx_num, nout), ...]}}
self.pending_removed_support: DefaultDict[str, DefaultDict[bytes, List[Tuple[int, int]]]] = defaultdict(
lambda: defaultdict(list))
self.staged_pending_abandoned: Dict[bytes, StagedClaimtrieItem] = {}
self.removed_support_txos_by_name_by_claim: DefaultDict[str, DefaultDict[bytes, List[Tuple[int, int]]]] = \
defaultdict(lambda: defaultdict(list))
self.abandoned_claims: Dict[bytes, StagedClaimtrieItem] = {}
# removed activated support amounts by claim hash
self.removed_active_support: DefaultDict[bytes, List[int]] = defaultdict(list)
self.removed_active_support_amount_by_claim: DefaultDict[bytes, List[int]] = defaultdict(list)
# pending activated support amounts by claim hash
self.staged_activated_support: DefaultDict[bytes, List[int]] = defaultdict(list)
self.activated_support_amount_by_claim: DefaultDict[bytes, List[int]] = defaultdict(list)
# pending activated name and claim hash to claim/update txo amount
self.staged_activated_claim: Dict[Tuple[str, bytes], int] = {}
self.activated_claim_amount_by_name_and_hash: Dict[Tuple[str, bytes], int] = {}
# pending claim and support activations per claim hash per name,
# used to process takeovers due to added activations
self.pending_activated: DefaultDict[str, DefaultDict[bytes, List[Tuple[PendingActivationKey, int]]]] = \
defaultdict(lambda: defaultdict(list))
activation_by_claim_by_name_type = DefaultDict[str, DefaultDict[bytes, List[Tuple[PendingActivationKey, int]]]]
self.activation_by_claim_by_name: activation_by_claim_by_name_type = defaultdict(lambda: defaultdict(list))
# these are used for detecting early takeovers by not yet activated claims/supports
self.possible_future_activated_support: DefaultDict[bytes, List[int]] = defaultdict(list)
self.possible_future_activated_claim: Dict[Tuple[str, bytes], int] = {}
self.possible_future_support_txos: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
self.possible_future_support_amounts_by_claim_hash: DefaultDict[bytes, List[int]] = defaultdict(list)
self.possible_future_claim_amount_by_name_and_hash: Dict[Tuple[str, bytes], int] = {}
self.possible_future_support_txos_by_claim_hash: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
self.removed_claims_to_send_es = set()
self.touched_claims_to_send_es = set()
@ -438,7 +438,7 @@ class BlockProcessor:
"""The data for a flush. The lock must be taken."""
assert self.state_lock.locked()
return FlushData(self.height, self.tx_count, self.headers, self.block_hashes,
self.block_txs, self.claimtrie_stash, self.undo_infos, self.utxo_cache,
self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip, self.undo_claims)
async def flush(self, flush_utxos):
@ -528,8 +528,8 @@ class BlockProcessor:
return []
(prev_tx_num, prev_idx, _) = spent_claims.pop(claim_hash)
# print(f"\tupdate {claim_hash.hex()} {tx_hash[::-1].hex()} {txo.amount}")
if (prev_tx_num, prev_idx) in self.pending_claims:
previous_claim = self.pending_claims.pop((prev_tx_num, prev_idx))
if (prev_tx_num, prev_idx) in self.txo_to_claim:
previous_claim = self.txo_to_claim.pop((prev_tx_num, prev_idx))
root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position
else:
v = self.db.get_claim_txo(
@ -546,15 +546,15 @@ class BlockProcessor:
claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num,
root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
)
self.pending_claims[(tx_num, nout)] = pending
self.pending_claim_txos[claim_hash] = (tx_num, nout)
self.txo_to_claim[(tx_num, nout)] = pending
self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
ops.extend(pending.get_add_claim_utxo_ops())
return ops
def _add_support(self, txo: 'Output', tx_num: int, nout: int) -> List['RevertableOp']:
supported_claim_hash = txo.claim_hash[::-1]
self.pending_supports[supported_claim_hash].append((tx_num, nout))
self.pending_support_txos[(tx_num, nout)] = supported_claim_hash, txo.amount
self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout))
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount
# print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}")
return StagedClaimtrieSupport(
supported_claim_hash, tx_num, nout, txo.amount
@ -570,12 +570,12 @@ class BlockProcessor:
def _spend_support_txo(self, txin):
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
if (txin_num, txin.prev_idx) in self.pending_support_txos:
spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx))
self.pending_supports[spent_support].remove((txin_num, txin.prev_idx))
if (txin_num, txin.prev_idx) in self.support_txo_to_claim:
spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx))
self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx))
supported_name = self._get_pending_claim_name(spent_support)
# print(f"\tspent support for {spent_support.hex()}")
self.pending_removed_support[supported_name][spent_support].append((txin_num, txin.prev_idx))
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
return StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops()
@ -583,10 +583,10 @@ class BlockProcessor:
if spent_support:
supported_name = self._get_pending_claim_name(spent_support)
if supported_name is not None:
self.pending_removed_support[supported_name][spent_support].append((txin_num, txin.prev_idx))
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True)
if 0 < activation < self.height + 1:
self.removed_active_support[spent_support].append(support_amount)
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
ops = StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount
@ -601,8 +601,8 @@ class BlockProcessor:
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]):
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
if (txin_num, txin.prev_idx) in self.pending_claims:
spent = self.pending_claims[(txin_num, txin.prev_idx)]
if (txin_num, txin.prev_idx) in self.txo_to_claim:
spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
else:
spent_claim_hash_and_name = self.db.get_claim_from_txo(
txin_num, txin.prev_idx
@ -635,9 +635,9 @@ class BlockProcessor:
def _abandon_claim(self, claim_hash, tx_num, nout, name) -> List['RevertableOp']:
claim_from_db = False
if (tx_num, nout) in self.pending_claims:
pending = self.pending_claims.pop((tx_num, nout))
self.staged_pending_abandoned[pending.claim_hash] = pending
if (tx_num, nout) in self.txo_to_claim:
pending = self.txo_to_claim.pop((tx_num, nout))
self.abandoned_claims[pending.claim_hash] = pending
claim_root_tx_num, claim_root_idx = pending.root_tx_num, pending.root_position
prev_amount, prev_signing_hash = pending.amount, pending.signing_hash
reposted_claim_hash = pending.reposted_claim_hash
@ -653,27 +653,27 @@ class BlockProcessor:
prev_signing_hash = self.db.get_channel_for_claim(claim_hash, tx_num, nout)
reposted_claim_hash = self.db.get_repost(claim_hash)
expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num))
self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem(
self.abandoned_claims[claim_hash] = staged = StagedClaimtrieItem(
name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num,
claim_root_idx, signature_is_valid, prev_signing_hash, reposted_claim_hash
)
if prev_signing_hash and prev_signing_hash in self.pending_channel_counts:
self.pending_channel_counts.pop(prev_signing_hash)
for support_txo_to_clear in self.pending_supports[claim_hash]:
self.pending_support_txos.pop(support_txo_to_clear)
self.pending_supports[claim_hash].clear()
self.pending_supports.pop(claim_hash)
for support_txo_to_clear in self.support_txos_by_claim[claim_hash]:
self.support_txo_to_claim.pop(support_txo_to_clear)
self.support_txos_by_claim[claim_hash].clear()
self.support_txos_by_claim.pop(claim_hash)
ops = []
if staged.name.startswith('@'): # abandon a channel, invalidate signatures
for k, claim_hash in self.db.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(staged.claim_hash)):
if claim_hash in self.staged_pending_abandoned:
if claim_hash in self.abandoned_claims:
continue
self.signatures_changed.add(claim_hash)
if claim_hash in self.pending_claim_txos:
claim = self.pending_claims[self.pending_claim_txos[claim_hash]]
if claim_hash in self.claim_hash_to_txo:
claim = self.txo_to_claim[self.claim_hash_to_txo[claim_hash]]
else:
claim = self.db.get_claim_txo(claim_hash)
assert claim is not None
@ -705,7 +705,7 @@ class BlockProcessor:
spent_claims = {}
ops = []
for expired_claim_hash, (tx_num, position, name, txi) in expired.items():
if (tx_num, position) not in self.pending_claims:
if (tx_num, position) not in self.txo_to_claim:
ops.extend(self._spend_claim_txo(txi, spent_claims))
if expired:
# do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned
@ -733,28 +733,28 @@ class BlockProcessor:
)
def _get_pending_claim_amount(self, name: str, claim_hash: bytes, height=None) -> int:
if (name, claim_hash) in self.staged_activated_claim:
return self.staged_activated_claim[(name, claim_hash)]
if (name, claim_hash) in self.possible_future_activated_claim:
return self.possible_future_activated_claim[(name, claim_hash)]
if (name, claim_hash) in self.activated_claim_amount_by_name_and_hash:
return self.activated_claim_amount_by_name_and_hash[(name, claim_hash)]
if (name, claim_hash) in self.possible_future_claim_amount_by_name_and_hash:
return self.possible_future_claim_amount_by_name_and_hash[(name, claim_hash)]
return self._cached_get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height or (self.height + 1))
def _get_pending_claim_name(self, claim_hash: bytes) -> Optional[str]:
assert claim_hash is not None
if claim_hash in self.pending_claims:
return self.pending_claims[claim_hash].name
if claim_hash in self.txo_to_claim:
return self.txo_to_claim[claim_hash].name
claim_info = self.db.get_claim_txo(claim_hash)
if claim_info:
return claim_info.name
def _get_pending_supported_amount(self, claim_hash: bytes, height: Optional[int] = None) -> int:
amount = self._cached_get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, height or (self.height + 1))
if claim_hash in self.staged_activated_support:
amount += sum(self.staged_activated_support[claim_hash])
if claim_hash in self.possible_future_activated_support:
amount += sum(self.possible_future_activated_support[claim_hash])
if claim_hash in self.removed_active_support:
return amount - sum(self.removed_active_support[claim_hash])
if claim_hash in self.activated_support_amount_by_claim:
amount += sum(self.activated_support_amount_by_claim[claim_hash])
if claim_hash in self.possible_future_support_amounts_by_claim_hash:
amount += sum(self.possible_future_support_amounts_by_claim_hash[claim_hash])
if claim_hash in self.removed_active_support_amount_by_claim:
return amount - sum(self.removed_active_support_amount_by_claim[claim_hash])
return amount
def _get_pending_effective_amount(self, name: str, claim_hash: bytes, height: Optional[int] = None) -> int:
@ -815,7 +815,7 @@ class BlockProcessor:
), amount
))
if is_support:
self.possible_future_support_txos[claim_hash].append((tx_num, nout))
self.possible_future_support_txos_by_claim_hash[claim_hash].append((tx_num, nout))
return StagedActivation(
ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, claim_hash, tx_num, nout,
height + delay, name, amount
@ -823,7 +823,7 @@ class BlockProcessor:
# determine names needing takeover/deletion due to controlling claims being abandoned
# and add ops to deactivate abandoned claims
for claim_hash, staged in self.staged_pending_abandoned.items():
for claim_hash, staged in self.abandoned_claims.items():
controlling = get_controlling(staged.name)
if controlling and controlling.claim_hash == claim_hash:
names_with_abandoned_controlling_claims.append(staged.name)
@ -831,7 +831,7 @@ class BlockProcessor:
activation = self.db.get_activation(staged.tx_num, staged.position)
if activation > 0: # db returns -1 for non-existent txos
# removed queued future activation from the db
self.claimtrie_stash.extend(
self.db_op_stack.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position,
activation, staged.name, staged.amount
@ -843,7 +843,7 @@ class BlockProcessor:
# get the removed activated supports for controlling claims to determine if takeovers are possible
abandoned_support_check_need_takeover = defaultdict(list)
for claim_hash, amounts in self.removed_active_support.items():
for claim_hash, amounts in self.removed_active_support_amount_by_claim.items():
name = self._get_pending_claim_name(claim_hash)
if name is None:
continue
@ -853,18 +853,18 @@ class BlockProcessor:
abandoned_support_check_need_takeover[(name, claim_hash)].extend(amounts)
# prepare to activate or delay activation of the pending claims being added this block
for (tx_num, nout), staged in self.pending_claims.items():
self.claimtrie_stash.extend(get_delayed_activate_ops(
for (tx_num, nout), staged in self.txo_to_claim.items():
self.db_op_stack.extend(get_delayed_activate_ops(
staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False
))
# and the supports
for (tx_num, nout), (claim_hash, amount) in self.pending_support_txos.items():
if claim_hash in self.staged_pending_abandoned:
for (tx_num, nout), (claim_hash, amount) in self.support_txo_to_claim.items():
if claim_hash in self.abandoned_claims:
continue
elif claim_hash in self.pending_claim_txos:
name = self.pending_claims[self.pending_claim_txos[claim_hash]].name
staged_is_new_claim = not self.pending_claims[self.pending_claim_txos[claim_hash]].is_update
elif claim_hash in self.claim_hash_to_txo:
name = self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].name
staged_is_new_claim = not self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].is_update
else:
supported_claim_info = self.db.get_claim_txo(claim_hash)
if not supported_claim_info:
@ -874,14 +874,14 @@ class BlockProcessor:
v = supported_claim_info
name = v.name
staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position)
self.claimtrie_stash.extend(get_delayed_activate_ops(
self.db_op_stack.extend(get_delayed_activate_ops(
name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True
))
# add the activation/delayed-activation ops
for activated, activated_txos in activated_at_height.items():
controlling = get_controlling(activated.name)
if activated.claim_hash in self.staged_pending_abandoned:
if activated.claim_hash in self.abandoned_claims:
continue
reactivate = False
if not controlling or controlling.claim_hash == activated.claim_hash:
@ -889,24 +889,24 @@ class BlockProcessor:
reactivate = True
for activated_txo in activated_txos:
if activated_txo.is_support and (activated_txo.tx_num, activated_txo.position) in \
self.pending_removed_support[activated.name][activated.claim_hash]:
self.removed_support_txos_by_name_by_claim[activated.name][activated.claim_hash]:
# print("\tskip activate support for pending abandoned claim")
continue
if activated_txo.is_claim:
txo_type = ACTIVATED_CLAIM_TXO_TYPE
txo_tup = (activated_txo.tx_num, activated_txo.position)
if txo_tup in self.pending_claims:
amount = self.pending_claims[txo_tup].amount
if txo_tup in self.txo_to_claim:
amount = self.txo_to_claim[txo_tup].amount
else:
amount = self.db.get_claim_txo_amount(
activated.claim_hash
)
self.staged_activated_claim[(activated.name, activated.claim_hash)] = amount
self.activated_claim_amount_by_name_and_hash[(activated.name, activated.claim_hash)] = amount
else:
txo_type = ACTIVATED_SUPPORT_TXO_TYPE
txo_tup = (activated_txo.tx_num, activated_txo.position)
if txo_tup in self.pending_support_txos:
amount = self.pending_support_txos[txo_tup][1]
if txo_tup in self.support_txo_to_claim:
amount = self.support_txo_to_claim[txo_tup][1]
else:
amount = self.db.get_support_txo_amount(
activated.claim_hash, activated_txo.tx_num, activated_txo.position
@ -914,8 +914,8 @@ class BlockProcessor:
if amount is None:
# print("\tskip activate support for non existent claim")
continue
self.staged_activated_support[activated.claim_hash].append(amount)
self.pending_activated[activated.name][activated.claim_hash].append((activated_txo, amount))
self.activated_support_amount_by_claim[activated.claim_hash].append(amount)
self.activation_by_claim_by_name[activated.name][activated.claim_hash].append((activated_txo, amount))
# print(f"\tactivate {'support' if txo_type == ACTIVATED_SUPPORT_TXO_TYPE else 'claim'} "
# f"{activated.claim_hash.hex()} @ {activated_txo.height}")
@ -928,14 +928,14 @@ class BlockProcessor:
# add existing claims to the queue for the takeover
# track that we need to reactivate these if one of them becomes controlling
for candidate_claim_hash, (tx_num, nout) in existing.items():
if candidate_claim_hash in self.staged_pending_abandoned:
if candidate_claim_hash in self.abandoned_claims:
continue
has_candidate = True
existing_activation = self.db.get_activation(tx_num, nout)
activate_key = PendingActivationKey(
existing_activation, ACTIVATED_CLAIM_TXO_TYPE, tx_num, nout
)
self.pending_activated[need_takeover][candidate_claim_hash].append((
self.activation_by_claim_by_name[need_takeover][candidate_claim_hash].append((
activate_key, self.db.get_claim_txo_amount(candidate_claim_hash)
))
need_reactivate_if_takes_over[(need_takeover, candidate_claim_hash)] = activate_key
@ -944,7 +944,7 @@ class BlockProcessor:
if not has_candidate:
# remove name takeover entry, the name is now unclaimed
controlling = get_controlling(need_takeover)
self.claimtrie_stash.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
self.db_op_stack.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
# scan for possible takeovers out of the accumulated activations, of these make sure there
# aren't any future activations for the taken over names with yet higher amounts, if there are
@ -961,40 +961,40 @@ class BlockProcessor:
activated.name, activated.claim_hash, activated_txos[-1].height + 1
)
if activated.claim_hash not in claim_exists:
claim_exists[activated.claim_hash] = activated.claim_hash in self.pending_claim_txos or (
claim_exists[activated.claim_hash] = activated.claim_hash in self.claim_hash_to_txo or (
self.db.get_claim_txo(activated.claim_hash) is not None)
if claim_exists[activated.claim_hash] and activated.claim_hash not in self.staged_pending_abandoned:
if claim_exists[activated.claim_hash] and activated.claim_hash not in self.abandoned_claims:
v = future_amount, activated, activated_txos[-1]
future_activations[activated.name][activated.claim_hash] = v
for name, future_activated in activate_in_future.items():
for claim_hash, activated in future_activated.items():
if claim_hash not in claim_exists:
claim_exists[claim_hash] = claim_hash in self.pending_claim_txos or (
claim_exists[claim_hash] = claim_hash in self.claim_hash_to_txo or (
self.db.get_claim_txo(claim_hash) is not None)
if not claim_exists[claim_hash]:
continue
if claim_hash in self.staged_pending_abandoned:
if claim_hash in self.abandoned_claims:
continue
for txo in activated:
v = txo[1], PendingActivationValue(claim_hash, name), txo[0]
future_activations[name][claim_hash] = v
if txo[0].is_claim:
self.possible_future_activated_claim[(name, claim_hash)] = txo[1]
self.possible_future_claim_amount_by_name_and_hash[(name, claim_hash)] = txo[1]
else:
self.possible_future_activated_support[claim_hash].append(txo[1])
self.possible_future_support_amounts_by_claim_hash[claim_hash].append(txo[1])
# process takeovers
checked_names = set()
for name, activated in self.pending_activated.items():
for name, activated in self.activation_by_claim_by_name.items():
checked_names.add(name)
controlling = controlling_claims[name]
amounts = {
claim_hash: self._get_pending_effective_amount(name, claim_hash)
for claim_hash in activated.keys() if claim_hash not in self.staged_pending_abandoned
for claim_hash in activated.keys() if claim_hash not in self.abandoned_claims
}
# if there is a controlling claim include it in the amounts to ensure it remains the max
if controlling and controlling.claim_hash not in self.staged_pending_abandoned:
if controlling and controlling.claim_hash not in self.abandoned_claims:
amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash)
winning_claim_hash = max(amounts, key=lambda x: amounts[x])
if not controlling or (winning_claim_hash != controlling.claim_hash and
@ -1018,14 +1018,14 @@ class BlockProcessor:
# print(f"\ttakeover by {winning_claim_hash.hex()} triggered early activation and "
# f"takeover by {winning_including_future_activations.hex()} at {height}")
# handle a pending activated claim jumping the takeover delay when another name takes over
if winning_including_future_activations not in self.pending_claim_txos:
if winning_including_future_activations not in self.claim_hash_to_txo:
claim = self.db.get_claim_txo(winning_including_future_activations)
tx_num = claim.tx_num
position = claim.position
amount = claim.amount
activation = self.db.get_activation(tx_num, position)
else:
tx_num, position = self.pending_claim_txos[winning_including_future_activations]
tx_num, position = self.claim_hash_to_txo[winning_including_future_activations]
amount = None
activation = None
for (k, tx_amount) in activate_in_future[name][winning_including_future_activations]:
@ -1035,13 +1035,13 @@ class BlockProcessor:
break
assert None not in (amount, activation)
# update the claim that's activating early
self.claimtrie_stash.extend(
self.db_op_stack.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, activation, name, amount
).get_remove_activate_ops()
)
self.claimtrie_stash.extend(
self.db_op_stack.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, height, name, amount
@ -1049,21 +1049,21 @@ class BlockProcessor:
)
for (k, amount) in activate_in_future[name][winning_including_future_activations]:
txo = (k.tx_num, k.position)
if txo in self.possible_future_support_txos[winning_including_future_activations]:
if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]:
t = ACTIVATED_SUPPORT_TXO_TYPE
self.claimtrie_stash.extend(
self.db_op_stack.extend(
StagedActivation(
t, winning_including_future_activations, k.tx_num,
k.position, k.height, name, amount
).get_remove_activate_ops()
)
self.claimtrie_stash.extend(
self.db_op_stack.extend(
StagedActivation(
t, winning_including_future_activations, k.tx_num,
k.position, height, name, amount
).get_activate_ops()
)
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
self.db_op_stack.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
elif not controlling or (winning_claim_hash != controlling.claim_hash and
name in names_with_abandoned_controlling_claims) or \
((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])):
@ -1073,27 +1073,27 @@ class BlockProcessor:
amount = self.db.get_claim_txo_amount(
winning_claim_hash
)
if winning_claim_hash in self.pending_claim_txos:
tx_num, position = self.pending_claim_txos[winning_claim_hash]
amount = self.pending_claims[(tx_num, position)].amount
if winning_claim_hash in self.claim_hash_to_txo:
tx_num, position = self.claim_hash_to_txo[winning_claim_hash]
amount = self.txo_to_claim[(tx_num, position)].amount
else:
tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position
if previous_pending_activate.height > height:
# the claim had a pending activation in the future, move it to now
if tx_num < self.tx_count:
self.claimtrie_stash.extend(
self.db_op_stack.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, previous_pending_activate.height, name, amount
).get_remove_activate_ops()
)
self.claimtrie_stash.extend(
self.db_op_stack.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, height, name, amount
).get_activate_ops()
)
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
self.db_op_stack.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
elif winning_claim_hash == controlling.claim_hash:
# print("\tstill winning")
pass
@ -1109,22 +1109,22 @@ class BlockProcessor:
controlling = get_controlling(name)
amounts = {
claim_hash: self._get_pending_effective_amount(name, claim_hash)
for claim_hash in self.db.get_claims_for_name(name) if claim_hash not in self.staged_pending_abandoned
for claim_hash in self.db.get_claims_for_name(name) if claim_hash not in self.abandoned_claims
}
if controlling and controlling.claim_hash not in self.staged_pending_abandoned:
if controlling and controlling.claim_hash not in self.abandoned_claims:
amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash)
winning = max(amounts, key=lambda x: amounts[x])
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning, height, controlling))
self.db_op_stack.extend(get_takeover_name_ops(name, winning, height, controlling))
# gather cumulative removed/touched sets to update the search index
self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys()))
self.removed_claims_to_send_es.update(set(self.abandoned_claims.keys()))
self.touched_claims_to_send_es.update(
set(self.staged_activated_support.keys()).union(
set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())
set(self.activated_support_amount_by_claim.keys()).union(
set(claim_hash for (_, claim_hash) in self.activated_claim_amount_by_name_and_hash.keys())
).union(self.signatures_changed).union(
set(self.removed_active_support.keys())
set(self.removed_active_support_amount_by_claim.keys())
).difference(self.removed_claims_to_send_es)
)
@ -1133,37 +1133,36 @@ class BlockProcessor:
removed_claim = self.db.get_claim_txo(removed)
if removed_claim:
amt = self.db.get_url_effective_amount(
removed_claim.name, removed_claim.tx_num, removed_claim.position, removed
removed_claim.name, removed
)
if amt and amt > 0:
self.claimtrie_stash.extend(get_remove_effective_amount_ops(
removed_claim.name, amt, removed_claim.tx_num,
removed_claim.position, removed
if amt:
self.db_op_stack.extend(get_remove_effective_amount_ops(
removed_claim.name, amt.effective_amount, amt.tx_num,
amt.position, removed
))
for touched in self.touched_claims_to_send_es:
if touched in self.pending_claim_txos:
pending = self.pending_claims[self.pending_claim_txos[touched]]
if touched in self.claim_hash_to_txo:
pending = self.txo_to_claim[self.claim_hash_to_txo[touched]]
name, tx_num, position = pending.name, pending.tx_num, pending.position
claim_from_db = self.db.get_claim_txo(touched)
if claim_from_db:
amount = self.db.get_url_effective_amount(
name, claim_from_db.tx_num, claim_from_db.position, touched
)
if amount and amount > 0:
self.claimtrie_stash.extend(get_remove_effective_amount_ops(
name, amount, claim_from_db.tx_num, claim_from_db.position, touched
claim_amount_info = self.db.get_url_effective_amount(name, touched)
if claim_amount_info:
self.db_op_stack.extend(get_remove_effective_amount_ops(
name, claim_amount_info.effective_amount, claim_amount_info.tx_num,
claim_amount_info.position, touched
))
else:
v = self.db.get_claim_txo(touched)
if not v:
continue
name, tx_num, position = v.name, v.tx_num, v.position
amt = self.db.get_url_effective_amount(name, tx_num, position, touched)
if amt and amt > 0:
self.claimtrie_stash.extend(get_remove_effective_amount_ops(
name, amt, tx_num, position, touched
amt = self.db.get_url_effective_amount(name, touched)
if amt:
self.db_op_stack.extend(get_remove_effective_amount_ops(
name, amt.effective_amount, amt.tx_num, amt.position, touched
))
self.claimtrie_stash.extend(
self.db_op_stack.extend(
get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
tx_num, position, touched)
)
@ -1184,7 +1183,7 @@ class BlockProcessor:
# Use local vars for speed in the loops
put_utxo = self.utxo_cache.__setitem__
claimtrie_stash_extend = self.claimtrie_stash.extend
claimtrie_stash_extend = self.db_op_stack.extend
spend_utxo = self.spend_utxo
undo_info_append = undo_info.append
update_touched = self.touched.update
@ -1261,7 +1260,7 @@ class BlockProcessor:
self.tx_count = tx_count
self.db.tx_counts.append(self.tx_count)
undo_claims = b''.join(op.invert().pack() for op in self.claimtrie_stash)
undo_claims = b''.join(op.invert().pack() for op in self.db_op_stack)
# print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash)))
if height >= self.daemon.cached_height() - self.env.reorg_limit:
@ -1275,20 +1274,20 @@ class BlockProcessor:
self.db.flush_dbs(self.flush_data())
self.claimtrie_stash.clear()
self.pending_claims.clear()
self.pending_claim_txos.clear()
self.pending_supports.clear()
self.pending_support_txos.clear()
self.pending_removed_support.clear()
self.staged_pending_abandoned.clear()
self.removed_active_support.clear()
self.staged_activated_support.clear()
self.staged_activated_claim.clear()
self.pending_activated.clear()
self.possible_future_activated_claim.clear()
self.possible_future_activated_support.clear()
self.possible_future_support_txos.clear()
self.db_op_stack.clear()
self.txo_to_claim.clear()
self.claim_hash_to_txo.clear()
self.support_txos_by_claim.clear()
self.support_txo_to_claim.clear()
self.removed_support_txos_by_name_by_claim.clear()
self.abandoned_claims.clear()
self.removed_active_support_amount_by_claim.clear()
self.activated_support_amount_by_claim.clear()
self.activated_claim_amount_by_name_and_hash.clear()
self.activation_by_claim_by_name.clear()
self.possible_future_claim_amount_by_name_and_hash.clear()
self.possible_future_support_amounts_by_claim_hash.clear()
self.possible_future_support_txos_by_claim_hash.clear()
self.pending_channels.clear()
self.amount_cache.clear()
self.signatures_changed.clear()
@ -1523,7 +1522,7 @@ class BlockProcessor:
self._caught_up_event = caught_up_event
try:
await self.db.open_dbs()
self.claimtrie_stash = RevertableOpStack(self.db.db.get)
self.db_op_stack = RevertableOpStack(self.db.db.get)
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count

View file

@ -18,6 +18,10 @@ class RevertableOp:
self.key = key
self.value = value
@property
def is_delete(self) -> bool:
return not self.is_put
def invert(self) -> 'RevertableOp':
raise NotImplementedError()
@ -26,7 +30,7 @@ class RevertableOp:
Serialize to bytes
"""
return struct.pack(
f'>BHH{len(self.key)}s{len(self.value)}s', self.is_put, len(self.key), len(self.value), self.key,
f'>BHH{len(self.key)}s{len(self.value)}s', int(self.is_put), len(self.key), len(self.value), self.key,
self.value
)
@ -76,12 +80,16 @@ class RevertableDelete(RevertableOp):
class RevertablePut(RevertableOp):
is_put = 1
is_put = True
def invert(self):
return RevertableDelete(self.key, self.value)
class OpStackIntegrity(Exception):
pass
class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]):
self._get = get_fn
@ -90,24 +98,27 @@ class RevertableOpStack:
def append(self, op: RevertableOp):
inverted = op.invert()
if self._items[op.key] and inverted == self._items[op.key][-1]:
self._items[op.key].pop()
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
return
elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op
pass # raise an error?
else:
if op.is_put:
stored = self._get(op.key)
if stored is not None:
assert RevertableDelete(op.key, stored) in self._items[op.key], \
f"db op tries to add on top of existing key: {op}"
self._items[op.key].append(op)
else:
stored = self._get(op.key)
if stored is not None and stored != op.value:
assert RevertableDelete(op.key, stored) in self._items[op.key], f"delete {op}"
else:
assert stored is not None, f"db op tries to delete nonexistent key: {op}"
assert stored == op.value, f"db op tries to delete with incorrect value: {op}"
self._items[op.key].append(op)
return # raise an error?
stored_val = self._get(op.key)
has_stored_val = stored_val is not None
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
if op.is_put and has_stored_val and not will_delete_existing_stored:
raise OpStackIntegrity(
f"db op tries to add on top of existing key without deleting first: {op}"
)
elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"delete {op}")
elif op.is_delete and not has_stored_val:
raise OpStackIntegrity("db op tries to delete nonexistent key: {op}")
elif op.is_delete and stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
self._items[op.key].append(op)
def extend(self, ops: Iterable[RevertableOp]):
for op in ops:

View file

@ -394,14 +394,11 @@ class LevelDB:
return support_only
return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1)
def get_url_effective_amount(self, name: str, tx_num: int, position: int, claim_hash: bytes):
def get_url_effective_amount(self, name: str, claim_hash: bytes):
for _k, _v in self.db.iterator(prefix=Prefixes.effective_amount.pack_partial_key(name)):
v = Prefixes.effective_amount.unpack_value(_v)
if v.claim_hash == claim_hash:
k = Prefixes.effective_amount.unpack_key(_k)
if k.tx_num == tx_num and k.position == position:
return k.effective_amount
return
return Prefixes.effective_amount.unpack_key(_k)
def get_claims_for_name(self, name):
claims = []
@ -654,7 +651,6 @@ class LevelDB:
k = Prefixes.pending_activation.unpack_key(_k)
v = Prefixes.pending_activation.unpack_value(_v)
activated[v].append(k)
return activated
async def _read_tx_counts(self):
@ -992,7 +988,7 @@ class LevelDB:
batch_delete(op.key)
flush_data.undo.clear()
flush_data.claimtrie_stash.clear()
flush_data.db_op_stack.clear()
while self.fs_height > flush_data.height:
self.fs_height -= 1