diff --git a/hub/db/db.py b/hub/db/db.py index 92209c6..7c5143c 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -1137,6 +1137,17 @@ class SecondaryDB: return self.tx_num_mapping[tx_hash] return self.prefix_db.tx_num.get(tx_hash).tx_num + def get_tx_nums(self, tx_hashes: List[bytes]) -> Dict[bytes, Optional[int]]: + if not tx_hashes: + return {} + if self._cache_all_tx_hashes: + return {tx_hash: self.tx_num_mapping[tx_hash] for tx_hash in tx_hashes} + return { + k: None if not v else v.tx_num for k, v in zip( + tx_hashes, self.prefix_db.tx_num.multi_get([(tx_hash,) for tx_hash in tx_hashes]) + ) + } + def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: if self._cache_all_claim_txos: return self.claim_to_txo.get(claim_hash) diff --git a/hub/scribe/service.py b/hub/scribe/service.py index b36e8b6..e82462c 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -504,70 +504,182 @@ class BlockchainProcessorService(BlockchainService): elif txo.is_support: self._add_support(height, txo, tx_num, nout) - def _spend_support_txo(self, height: int, txin: TxInput): - txin_num = self.get_pending_tx_num(txin.prev_hash) - activation = 0 - if (txin_num, txin.prev_idx) in self.support_txo_to_claim: - spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx)) - self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx)) - supported_name = self._get_pending_claim_name(spent_support) - self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx)) - else: - spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) - if not spent_support: # it is not a support - return - supported_name = self._get_pending_claim_name(spent_support) - if supported_name is not None: - self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append( - (txin_num, txin.prev_idx)) - activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True) - if 0 < activation <= self.height: - self.removed_active_support_amount_by_claim[spent_support].append(support_amount) - self.effective_amount_delta[spent_support] -= support_amount - self.active_support_amount_delta[spent_support] -= support_amount - if supported_name is not None and activation > 0: - self.get_remove_activate_ops( - ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, - support_amount - ) - # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}") - self.db.prefix_db.claim_to_support.stage_delete((spent_support, txin_num, txin.prev_idx), (support_amount,)) - self.db.prefix_db.support_to_claim.stage_delete((txin_num, txin.prev_idx), (spent_support,)) - self.pending_support_amount_change[spent_support] -= support_amount - - def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool: - txin_num = self.get_pending_tx_num(txin.prev_hash) - if (txin_num, txin.prev_idx) in self.txo_to_claim: - spent = self.txo_to_claim[(txin_num, txin.prev_idx)] - else: - if not self.db.get_cached_claim_exists(txin_num, txin.prev_idx): - # txo is not a claim - return False - spent_claim_hash_and_name = self.db.get_claim_from_txo( - txin_num, txin.prev_idx + def _spend_claims_and_supports(self, txis: List[TxInput], spent_claims: Dict[bytes, Tuple[int, int, str]]): + tx_nums = self.db.get_tx_nums( + list( + {tx_hash for (tx_hash, nout, _, _) in txis if tx_hash not in self.pending_transaction_num_mapping} ) - assert spent_claim_hash_and_name is not None - spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) - activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=False) - if 0 < activation <= self.height + 1: - self.effective_amount_delta[spent_claim_hash_and_name.claim_hash] -= spent.amount - if self.env.cache_all_claim_txos: - claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) - if not self.db.txo_to_claim[txin_num]: - self.db.txo_to_claim.pop(txin_num) - self.db.claim_to_txo.pop(claim_hash) - if spent.reposted_claim_hash: - self.pending_reposted.add(spent.reposted_claim_hash) - if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims: - self.pending_channel_counts[spent.signing_hash] -= 1 - spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name) - # print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") - self.get_remove_claim_utxo_ops(spent) - return True + ) + need_check_claim_or_support = [] + spent_support_count = 0 + spent_claim_count = 0 + for (tx_hash, nout, _, _) in txis: + if tx_hash in self.pending_transaction_num_mapping: + txin_num = self.pending_transaction_num_mapping[tx_hash] + else: + txin_num = tx_nums[tx_hash] + if not txin_num: + continue - def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims): - if not self._spend_claim_txo(txin, spent_claims): - self._spend_support_txo(height, txin) + if (txin_num, nout) not in self.txo_to_claim and (txin_num, nout) not in self.support_txo_to_claim: + need_check_claim_or_support.append((txin_num, tx_hash, nout)) + spent_supports = { + (txin_num, tx_hash, nout): support_v.claim_hash for (txin_num, tx_hash, nout), support_v in zip( + need_check_claim_or_support, self.db.prefix_db.support_to_claim.multi_get( + [(tx_num, n) for (tx_num, _, n) in need_check_claim_or_support])) + if support_v is not None + } + + spent_support_amounts = { + (tx_num, nout): packed_support_amount.amount for (tx_num, _, nout), packed_support_amount in zip( + spent_supports.keys(), + self.db.prefix_db.claim_to_support.multi_get([ + (claim_hash, tx_num, nout) + for (tx_num, tx_hash, nout), claim_hash in spent_supports.items()] + ) + ) + } + + to_check_spent_claims = [ + (txin_num, tx_hash, nout) for (txin_num, tx_hash, nout) in need_check_claim_or_support + if (txin_num, tx_hash, nout) not in spent_supports + ] + spent_claim_hashes = { + (txin_num, tx_hash, nout): v.claim_hash + for (txin_num, tx_hash, nout), v in zip( + to_check_spent_claims, self.db.prefix_db.txo_to_claim.multi_get( + [(tx_num, n) for (tx_num, _, n) in to_check_spent_claims] + ) + ) if v is not None + } + spent_claim_txos = { + claim_hash: claim_txo for claim_hash, claim_txo in zip( + spent_claim_hashes.values(), self.db.prefix_db.claim_to_txo.multi_get( + [(c_h,) for c_h in spent_claim_hashes.values()] + ) + ) + } + + needed_channels_for_claims = [] + needed_reposted = set() + needed_activations = [] + + for (tx_hash, nout, _, _) in txis: + if tx_hash in self.pending_transaction_num_mapping: + txin_num = self.pending_transaction_num_mapping[tx_hash] + else: + txin_num = tx_nums[tx_hash] + if not txin_num: + continue + if (txin_num, tx_hash, nout) in spent_claim_hashes: + spent_claim_count += 1 + claim_hash = spent_claim_hashes[(txin_num, tx_hash, nout)] + claim = spent_claim_txos[claim_hash] + if claim_hash not in self.doesnt_have_valid_signature: + needed_channels_for_claims.append((claim_hash, claim.tx_num, claim.position)) + + # signing_hash = self.db.get_channel_for_claim(claim_hash, claim.tx_num, claim.position) + # reposted_claim_hash = self.db.get_repost(claim_hash) + needed_reposted.add(claim_hash) + # activation = self.db.get_activation(txin_num, nout, is_support=False) + needed_activations.append((1, txin_num, nout)) + elif (txin_num, tx_hash, nout) in spent_supports: + spent_support_count += 1 + needed_activations.append((2, txin_num, nout)) + # activation = self.db.get_activation(txin_num, nout, is_support=True) + else: + pass + needed_reposted = list(needed_reposted) + reposts = { + claim_hash: v.reposted_claim_hash for claim_hash, v in zip( + needed_reposted, self.db.prefix_db.repost.multi_get([(c_h,) for c_h in needed_reposted]) + ) if v is not None + } + activations = { + (tx_num, nout): -1 if not v else v.height for (_, tx_num, nout), v in zip( + needed_activations, self.db.prefix_db.activated.multi_get(needed_activations) + ) + } + signing_channels = { + k: None if not v else v.signing_hash for k, v in zip( + needed_channels_for_claims, self.db.prefix_db.claim_to_channel.multi_get(needed_channels_for_claims) + ) + } + + for (tx_hash, nout, _, _) in txis: + if tx_hash in self.pending_transaction_num_mapping: + txin_num = self.pending_transaction_num_mapping[tx_hash] + else: + txin_num = tx_nums[tx_hash] + if not txin_num: + continue + if (txin_num, nout) in self.txo_to_claim or (txin_num, tx_hash, nout) in spent_claim_hashes: + if (txin_num, nout) in self.txo_to_claim: + spent = self.txo_to_claim[(txin_num, nout)] + else: + claim_hash = spent_claim_hashes[(txin_num, tx_hash, nout)] + claim = spent_claim_txos[claim_hash] + if claim_hash in self.doesnt_have_valid_signature: + signing_hash = None + else: + signing_hash = signing_channels[(claim_hash, claim.tx_num, claim.position)] + reposted_claim_hash = reposts.get(claim_hash) + spent = StagedClaimtrieItem( + claim.name, claim.normalized_name, claim_hash, claim.amount, + self.coin.get_expiration_height( + bisect_right(self.db.tx_counts, claim.tx_num), + extended=self.height >= self.coin.nExtendedClaimExpirationForkHeight + ), + claim.tx_num, claim.position, claim.root_tx_num, claim.root_position, + claim.channel_signature_is_valid, signing_hash, reposted_claim_hash + ) + activation = activations[(txin_num, nout)] + if 0 < activation <= self.height + 1: + self.effective_amount_delta[claim_hash] -= spent.amount + self.future_effective_amount_delta[spent.claim_hash] -= spent.amount + if self.env.cache_all_claim_txos: + claim_hash = self.db.txo_to_claim[txin_num].pop(nout) + if not self.db.txo_to_claim[txin_num]: + self.db.txo_to_claim.pop(txin_num) + self.db.claim_to_txo.pop(claim_hash) + if spent.reposted_claim_hash: + self.pending_reposted.add(spent.reposted_claim_hash) + if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims: + self.pending_channel_counts[spent.signing_hash] -= 1 + spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name) + # print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") + self.get_remove_claim_utxo_ops(spent) + elif (txin_num, nout) in self.support_txo_to_claim or (txin_num, tx_hash, nout) in spent_supports: + activation = 0 + if (txin_num, nout) in self.support_txo_to_claim: + spent_support, support_amount, supported_name = self.support_txo_to_claim.pop((txin_num, nout)) + self.support_txos_by_claim[spent_support].remove((txin_num, nout)) + self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, nout)) + else: + spent_support = spent_supports[(txin_num, tx_hash, nout)] + support_amount = spent_support_amounts[(txin_num, nout)] + supported_name = self._get_pending_claim_name(spent_support) + if supported_name is not None: + self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append( + (txin_num, nout)) + activation = activations[(txin_num, nout)] + if 0 < activation <= self.height: # TODO: fnord + self.removed_active_support_amount_by_claim[spent_support].append(support_amount) + self.effective_amount_delta[spent_support] -= support_amount + self.active_support_amount_delta[spent_support] -= support_amount + if supported_name is not None and activation > 0: + self.get_remove_activate_ops( + ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, nout, activation, supported_name, + support_amount + ) + # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount} {tx_hash[::-1].hex()}:{nout}") + self.db.prefix_db.claim_to_support.stash_delete((spent_support, txin_num, nout), (support_amount,)) + self.db.prefix_db.support_to_claim.stash_delete((txin_num, nout), (spent_support,)) + self.pending_support_amount_change[spent_support] -= support_amount + self.future_effective_amount_delta[spent_support] -= support_amount + else: + pass + return spent_claim_count, spent_support_count def _abandon_claim(self, claim_hash: bytes, tx_num: int, nout: int, normalized_name: str): if (tx_num, nout) in self.txo_to_claim: @@ -676,9 +788,8 @@ class BlockchainProcessorService(BlockchainService): expired = self.db.get_expired_by_height(height) self.expired_claim_hashes.update(set(expired.keys())) spent_claims = {} - for expired_claim_hash, (tx_num, position, name, txi) in expired.items(): - if (tx_num, position) not in self.txo_to_claim: - self._spend_claim_txo(txi, spent_claims) + txis = [txi for expired_claim_hash, (tx_num, position, name, txi) in expired.items() if (tx_num, position) not in self.txo_to_claim] + self._spend_claims_and_supports(txis, spent_claims) if expired: # abandon the channels last to handle abandoned signed claims in the same tx, # see test_abandon_channel_and_claims_in_same_tx @@ -1391,17 +1502,23 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,)) def advance_block(self, block: Block): + txo_count = 0 txi_count = 0 + claim_added_count = 0 + support_added_count = 0 + claim_spent_count = 0 + support_spent_count = 0 + abandoned_cnt = 0 + abandoned_chans_cnt = 0 height = self.height + 1 # print("advance ", height) # Use local vars for speed in the loops tx_count = self.tx_count spend_utxos = self.spend_utxos add_utxo = self.add_utxo - spend_claim_or_support_txo = self._spend_claim_or_support_txo add_claim_or_support = self._add_claim_or_support + # spend_claim_or_support = self._spend_claim_or_support_txo txs: List[Tuple[Tx, bytes]] = block.transactions - txo_count = 0 self.db.prefix_db.block_hash.stash_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),)) self.db.prefix_db.header.stash_put(key_args=(height,), value_args=(block.header,)) @@ -1428,6 +1545,8 @@ class BlockchainProcessorService(BlockchainService): if txin.is_generation(): continue append_spent_txo((txin.prev_hash, txin.prev_idx)) + spent_claims_cnt, spent_supports_cnt = self._spend_claims_and_supports(tx.inputs, spent_claims) + claim_spent_count += spent_claims_cnt support_spent_count += spent_supports_cnt spend_utxos(tx_count, spent_txos)