batch spend claims and supports

This commit is contained in:
Jack Robison 2022-10-16 14:36:13 -04:00
parent 6a9a2ad40f
commit 2150363108
2 changed files with 197 additions and 67 deletions

View file

@ -1137,6 +1137,17 @@ class SecondaryDB:
return self.tx_num_mapping[tx_hash]
return self.prefix_db.tx_num.get(tx_hash).tx_num
def get_tx_nums(self, tx_hashes: List[bytes]) -> Dict[bytes, Optional[int]]:
if not tx_hashes:
return {}
if self._cache_all_tx_hashes:
return {tx_hash: self.tx_num_mapping[tx_hash] for tx_hash in tx_hashes}
return {
k: None if not v else v.tx_num for k, v in zip(
tx_hashes, self.prefix_db.tx_num.multi_get([(tx_hash,) for tx_hash in tx_hashes])
)
}
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self._cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)

View file

@ -504,70 +504,182 @@ class BlockchainProcessorService(BlockchainService):
elif txo.is_support:
self._add_support(height, txo, tx_num, nout)
def _spend_support_txo(self, height: int, txin: TxInput):
txin_num = self.get_pending_tx_num(txin.prev_hash)
activation = 0
if (txin_num, txin.prev_idx) in self.support_txo_to_claim:
spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx))
self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx))
supported_name = self._get_pending_claim_name(spent_support)
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
else:
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
if not spent_support: # it is not a support
return
supported_name = self._get_pending_claim_name(spent_support)
if supported_name is not None:
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append(
(txin_num, txin.prev_idx))
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True)
if 0 < activation <= self.height:
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
self.effective_amount_delta[spent_support] -= support_amount
self.active_support_amount_delta[spent_support] -= support_amount
if supported_name is not None and activation > 0:
self.get_remove_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
support_amount
)
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
self.db.prefix_db.claim_to_support.stage_delete((spent_support, txin_num, txin.prev_idx), (support_amount,))
self.db.prefix_db.support_to_claim.stage_delete((txin_num, txin.prev_idx), (spent_support,))
self.pending_support_amount_change[spent_support] -= support_amount
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool:
txin_num = self.get_pending_tx_num(txin.prev_hash)
if (txin_num, txin.prev_idx) in self.txo_to_claim:
spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
else:
if not self.db.get_cached_claim_exists(txin_num, txin.prev_idx):
# txo is not a claim
return False
spent_claim_hash_and_name = self.db.get_claim_from_txo(
txin_num, txin.prev_idx
def _spend_claims_and_supports(self, txis: List[TxInput], spent_claims: Dict[bytes, Tuple[int, int, str]]):
tx_nums = self.db.get_tx_nums(
list(
{tx_hash for (tx_hash, nout, _, _) in txis if tx_hash not in self.pending_transaction_num_mapping}
)
assert spent_claim_hash_and_name is not None
spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash)
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=False)
if 0 < activation <= self.height + 1:
self.effective_amount_delta[spent_claim_hash_and_name.claim_hash] -= spent.amount
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if spent.reposted_claim_hash:
self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
self.pending_channel_counts[spent.signing_hash] -= 1
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name)
# print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
self.get_remove_claim_utxo_ops(spent)
return True
)
need_check_claim_or_support = []
spent_support_count = 0
spent_claim_count = 0
for (tx_hash, nout, _, _) in txis:
if tx_hash in self.pending_transaction_num_mapping:
txin_num = self.pending_transaction_num_mapping[tx_hash]
else:
txin_num = tx_nums[tx_hash]
if not txin_num:
continue
def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims):
if not self._spend_claim_txo(txin, spent_claims):
self._spend_support_txo(height, txin)
if (txin_num, nout) not in self.txo_to_claim and (txin_num, nout) not in self.support_txo_to_claim:
need_check_claim_or_support.append((txin_num, tx_hash, nout))
spent_supports = {
(txin_num, tx_hash, nout): support_v.claim_hash for (txin_num, tx_hash, nout), support_v in zip(
need_check_claim_or_support, self.db.prefix_db.support_to_claim.multi_get(
[(tx_num, n) for (tx_num, _, n) in need_check_claim_or_support]))
if support_v is not None
}
spent_support_amounts = {
(tx_num, nout): packed_support_amount.amount for (tx_num, _, nout), packed_support_amount in zip(
spent_supports.keys(),
self.db.prefix_db.claim_to_support.multi_get([
(claim_hash, tx_num, nout)
for (tx_num, tx_hash, nout), claim_hash in spent_supports.items()]
)
)
}
to_check_spent_claims = [
(txin_num, tx_hash, nout) for (txin_num, tx_hash, nout) in need_check_claim_or_support
if (txin_num, tx_hash, nout) not in spent_supports
]
spent_claim_hashes = {
(txin_num, tx_hash, nout): v.claim_hash
for (txin_num, tx_hash, nout), v in zip(
to_check_spent_claims, self.db.prefix_db.txo_to_claim.multi_get(
[(tx_num, n) for (tx_num, _, n) in to_check_spent_claims]
)
) if v is not None
}
spent_claim_txos = {
claim_hash: claim_txo for claim_hash, claim_txo in zip(
spent_claim_hashes.values(), self.db.prefix_db.claim_to_txo.multi_get(
[(c_h,) for c_h in spent_claim_hashes.values()]
)
)
}
needed_channels_for_claims = []
needed_reposted = set()
needed_activations = []
for (tx_hash, nout, _, _) in txis:
if tx_hash in self.pending_transaction_num_mapping:
txin_num = self.pending_transaction_num_mapping[tx_hash]
else:
txin_num = tx_nums[tx_hash]
if not txin_num:
continue
if (txin_num, tx_hash, nout) in spent_claim_hashes:
spent_claim_count += 1
claim_hash = spent_claim_hashes[(txin_num, tx_hash, nout)]
claim = spent_claim_txos[claim_hash]
if claim_hash not in self.doesnt_have_valid_signature:
needed_channels_for_claims.append((claim_hash, claim.tx_num, claim.position))
# signing_hash = self.db.get_channel_for_claim(claim_hash, claim.tx_num, claim.position)
# reposted_claim_hash = self.db.get_repost(claim_hash)
needed_reposted.add(claim_hash)
# activation = self.db.get_activation(txin_num, nout, is_support=False)
needed_activations.append((1, txin_num, nout))
elif (txin_num, tx_hash, nout) in spent_supports:
spent_support_count += 1
needed_activations.append((2, txin_num, nout))
# activation = self.db.get_activation(txin_num, nout, is_support=True)
else:
pass
needed_reposted = list(needed_reposted)
reposts = {
claim_hash: v.reposted_claim_hash for claim_hash, v in zip(
needed_reposted, self.db.prefix_db.repost.multi_get([(c_h,) for c_h in needed_reposted])
) if v is not None
}
activations = {
(tx_num, nout): -1 if not v else v.height for (_, tx_num, nout), v in zip(
needed_activations, self.db.prefix_db.activated.multi_get(needed_activations)
)
}
signing_channels = {
k: None if not v else v.signing_hash for k, v in zip(
needed_channels_for_claims, self.db.prefix_db.claim_to_channel.multi_get(needed_channels_for_claims)
)
}
for (tx_hash, nout, _, _) in txis:
if tx_hash in self.pending_transaction_num_mapping:
txin_num = self.pending_transaction_num_mapping[tx_hash]
else:
txin_num = tx_nums[tx_hash]
if not txin_num:
continue
if (txin_num, nout) in self.txo_to_claim or (txin_num, tx_hash, nout) in spent_claim_hashes:
if (txin_num, nout) in self.txo_to_claim:
spent = self.txo_to_claim[(txin_num, nout)]
else:
claim_hash = spent_claim_hashes[(txin_num, tx_hash, nout)]
claim = spent_claim_txos[claim_hash]
if claim_hash in self.doesnt_have_valid_signature:
signing_hash = None
else:
signing_hash = signing_channels[(claim_hash, claim.tx_num, claim.position)]
reposted_claim_hash = reposts.get(claim_hash)
spent = StagedClaimtrieItem(
claim.name, claim.normalized_name, claim_hash, claim.amount,
self.coin.get_expiration_height(
bisect_right(self.db.tx_counts, claim.tx_num),
extended=self.height >= self.coin.nExtendedClaimExpirationForkHeight
),
claim.tx_num, claim.position, claim.root_tx_num, claim.root_position,
claim.channel_signature_is_valid, signing_hash, reposted_claim_hash
)
activation = activations[(txin_num, nout)]
if 0 < activation <= self.height + 1:
self.effective_amount_delta[claim_hash] -= spent.amount
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if spent.reposted_claim_hash:
self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
self.pending_channel_counts[spent.signing_hash] -= 1
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name)
# print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
self.get_remove_claim_utxo_ops(spent)
elif (txin_num, nout) in self.support_txo_to_claim or (txin_num, tx_hash, nout) in spent_supports:
activation = 0
if (txin_num, nout) in self.support_txo_to_claim:
spent_support, support_amount, supported_name = self.support_txo_to_claim.pop((txin_num, nout))
self.support_txos_by_claim[spent_support].remove((txin_num, nout))
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, nout))
else:
spent_support = spent_supports[(txin_num, tx_hash, nout)]
support_amount = spent_support_amounts[(txin_num, nout)]
supported_name = self._get_pending_claim_name(spent_support)
if supported_name is not None:
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append(
(txin_num, nout))
activation = activations[(txin_num, nout)]
if 0 < activation <= self.height: # TODO: fnord
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
self.effective_amount_delta[spent_support] -= support_amount
self.active_support_amount_delta[spent_support] -= support_amount
if supported_name is not None and activation > 0:
self.get_remove_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, nout, activation, supported_name,
support_amount
)
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount} {tx_hash[::-1].hex()}:{nout}")
self.db.prefix_db.claim_to_support.stash_delete((spent_support, txin_num, nout), (support_amount,))
self.db.prefix_db.support_to_claim.stash_delete((txin_num, nout), (spent_support,))
self.pending_support_amount_change[spent_support] -= support_amount
self.future_effective_amount_delta[spent_support] -= support_amount
else:
pass
return spent_claim_count, spent_support_count
def _abandon_claim(self, claim_hash: bytes, tx_num: int, nout: int, normalized_name: str):
if (tx_num, nout) in self.txo_to_claim:
@ -676,9 +788,8 @@ class BlockchainProcessorService(BlockchainService):
expired = self.db.get_expired_by_height(height)
self.expired_claim_hashes.update(set(expired.keys()))
spent_claims = {}
for expired_claim_hash, (tx_num, position, name, txi) in expired.items():
if (tx_num, position) not in self.txo_to_claim:
self._spend_claim_txo(txi, spent_claims)
txis = [txi for expired_claim_hash, (tx_num, position, name, txi) in expired.items() if (tx_num, position) not in self.txo_to_claim]
self._spend_claims_and_supports(txis, spent_claims)
if expired:
# abandon the channels last to handle abandoned signed claims in the same tx,
# see test_abandon_channel_and_claims_in_same_tx
@ -1391,17 +1502,23 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block):
txo_count = 0
txi_count = 0
claim_added_count = 0
support_added_count = 0
claim_spent_count = 0
support_spent_count = 0
abandoned_cnt = 0
abandoned_chans_cnt = 0
height = self.height + 1
# print("advance ", height)
# Use local vars for speed in the loops
tx_count = self.tx_count
spend_utxos = self.spend_utxos
add_utxo = self.add_utxo
spend_claim_or_support_txo = self._spend_claim_or_support_txo
add_claim_or_support = self._add_claim_or_support
# spend_claim_or_support = self._spend_claim_or_support_txo
txs: List[Tuple[Tx, bytes]] = block.transactions
txo_count = 0
self.db.prefix_db.block_hash.stash_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
self.db.prefix_db.header.stash_put(key_args=(height,), value_args=(block.header,))
@ -1428,6 +1545,8 @@ class BlockchainProcessorService(BlockchainService):
if txin.is_generation():
continue
append_spent_txo((txin.prev_hash, txin.prev_idx))
spent_claims_cnt, spent_supports_cnt = self._spend_claims_and_supports(tx.inputs, spent_claims)
claim_spent_count += spent_claims_cnt
support_spent_count += spent_supports_cnt
spend_utxos(tx_count, spent_txos)