From cf66c2a1ee26585c6a58cdeff6b01f439698a6a1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 1 Jul 2021 17:23:27 -0400 Subject: [PATCH] rename things -fix effective amount integrity error --- lbry/wallet/server/block_processor.py | 287 +++++++++++++------------- lbry/wallet/server/db/revertable.py | 49 +++-- lbry/wallet/server/leveldb.py | 10 +- 3 files changed, 176 insertions(+), 170 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 2d04f1ecd..155322a92 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -207,7 +207,7 @@ class BlockProcessor: self.db_deletes = [] # Claimtrie cache - self.claimtrie_stash = None + self.db_op_stack = None self.undo_claims = [] # If the lock is successfully acquired, in-memory chain state @@ -223,31 +223,31 @@ class BlockProcessor: ################################# # txo to pending claim - self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} + self.txo_to_claim: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} # claim hash to pending claim txo - self.pending_claim_txos: typing.Dict[bytes, Tuple[int, int]] = {} + self.claim_hash_to_txo: typing.Dict[bytes, Tuple[int, int]] = {} # claim hash to lists of pending support txos - self.pending_supports: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) + self.support_txos_by_claim: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) # support txo: (supported claim hash, support amount) - self.pending_support_txos: Dict[Tuple[int, int], Tuple[bytes, int]] = {} + self.support_txo_to_claim: Dict[Tuple[int, int], Tuple[bytes, int]] = {} # removed supports {name: {claim_hash: [(tx_num, nout), ...]}} - self.pending_removed_support: DefaultDict[str, DefaultDict[bytes, List[Tuple[int, int]]]] = defaultdict( - lambda: defaultdict(list)) - self.staged_pending_abandoned: Dict[bytes, StagedClaimtrieItem] = {} + self.removed_support_txos_by_name_by_claim: DefaultDict[str, DefaultDict[bytes, List[Tuple[int, int]]]] = \ + defaultdict(lambda: defaultdict(list)) + self.abandoned_claims: Dict[bytes, StagedClaimtrieItem] = {} # removed activated support amounts by claim hash - self.removed_active_support: DefaultDict[bytes, List[int]] = defaultdict(list) + self.removed_active_support_amount_by_claim: DefaultDict[bytes, List[int]] = defaultdict(list) # pending activated support amounts by claim hash - self.staged_activated_support: DefaultDict[bytes, List[int]] = defaultdict(list) + self.activated_support_amount_by_claim: DefaultDict[bytes, List[int]] = defaultdict(list) # pending activated name and claim hash to claim/update txo amount - self.staged_activated_claim: Dict[Tuple[str, bytes], int] = {} + self.activated_claim_amount_by_name_and_hash: Dict[Tuple[str, bytes], int] = {} # pending claim and support activations per claim hash per name, # used to process takeovers due to added activations - self.pending_activated: DefaultDict[str, DefaultDict[bytes, List[Tuple[PendingActivationKey, int]]]] = \ - defaultdict(lambda: defaultdict(list)) + activation_by_claim_by_name_type = DefaultDict[str, DefaultDict[bytes, List[Tuple[PendingActivationKey, int]]]] + self.activation_by_claim_by_name: activation_by_claim_by_name_type = defaultdict(lambda: defaultdict(list)) # these are used for detecting early takeovers by not yet activated claims/supports - self.possible_future_activated_support: DefaultDict[bytes, List[int]] = defaultdict(list) - self.possible_future_activated_claim: Dict[Tuple[str, bytes], int] = {} - self.possible_future_support_txos: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) + self.possible_future_support_amounts_by_claim_hash: DefaultDict[bytes, List[int]] = defaultdict(list) + self.possible_future_claim_amount_by_name_and_hash: Dict[Tuple[str, bytes], int] = {} + self.possible_future_support_txos_by_claim_hash: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) self.removed_claims_to_send_es = set() self.touched_claims_to_send_es = set() @@ -438,7 +438,7 @@ class BlockProcessor: """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() return FlushData(self.height, self.tx_count, self.headers, self.block_hashes, - self.block_txs, self.claimtrie_stash, self.undo_infos, self.utxo_cache, + self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip, self.undo_claims) async def flush(self, flush_utxos): @@ -528,8 +528,8 @@ class BlockProcessor: return [] (prev_tx_num, prev_idx, _) = spent_claims.pop(claim_hash) # print(f"\tupdate {claim_hash.hex()} {tx_hash[::-1].hex()} {txo.amount}") - if (prev_tx_num, prev_idx) in self.pending_claims: - previous_claim = self.pending_claims.pop((prev_tx_num, prev_idx)) + if (prev_tx_num, prev_idx) in self.txo_to_claim: + previous_claim = self.txo_to_claim.pop((prev_tx_num, prev_idx)) root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position else: v = self.db.get_claim_txo( @@ -546,15 +546,15 @@ class BlockProcessor: claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash ) - self.pending_claims[(tx_num, nout)] = pending - self.pending_claim_txos[claim_hash] = (tx_num, nout) + self.txo_to_claim[(tx_num, nout)] = pending + self.claim_hash_to_txo[claim_hash] = (tx_num, nout) ops.extend(pending.get_add_claim_utxo_ops()) return ops def _add_support(self, txo: 'Output', tx_num: int, nout: int) -> List['RevertableOp']: supported_claim_hash = txo.claim_hash[::-1] - self.pending_supports[supported_claim_hash].append((tx_num, nout)) - self.pending_support_txos[(tx_num, nout)] = supported_claim_hash, txo.amount + self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout)) + self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount # print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}") return StagedClaimtrieSupport( supported_claim_hash, tx_num, nout, txo.amount @@ -570,12 +570,12 @@ class BlockProcessor: def _spend_support_txo(self, txin): txin_num = self.db.transaction_num_mapping[txin.prev_hash] - if (txin_num, txin.prev_idx) in self.pending_support_txos: - spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx)) - self.pending_supports[spent_support].remove((txin_num, txin.prev_idx)) + if (txin_num, txin.prev_idx) in self.support_txo_to_claim: + spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx)) + self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx)) supported_name = self._get_pending_claim_name(spent_support) # print(f"\tspent support for {spent_support.hex()}") - self.pending_removed_support[supported_name][spent_support].append((txin_num, txin.prev_idx)) + self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx)) return StagedClaimtrieSupport( spent_support, txin_num, txin.prev_idx, support_amount ).get_spend_support_txo_ops() @@ -583,10 +583,10 @@ class BlockProcessor: if spent_support: supported_name = self._get_pending_claim_name(spent_support) if supported_name is not None: - self.pending_removed_support[supported_name][spent_support].append((txin_num, txin.prev_idx)) + self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx)) activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True) if 0 < activation < self.height + 1: - self.removed_active_support[spent_support].append(support_amount) + self.removed_active_support_amount_by_claim[spent_support].append(support_amount) # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}") ops = StagedClaimtrieSupport( spent_support, txin_num, txin.prev_idx, support_amount @@ -601,8 +601,8 @@ class BlockProcessor: def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]): txin_num = self.db.transaction_num_mapping[txin.prev_hash] - if (txin_num, txin.prev_idx) in self.pending_claims: - spent = self.pending_claims[(txin_num, txin.prev_idx)] + if (txin_num, txin.prev_idx) in self.txo_to_claim: + spent = self.txo_to_claim[(txin_num, txin.prev_idx)] else: spent_claim_hash_and_name = self.db.get_claim_from_txo( txin_num, txin.prev_idx @@ -635,9 +635,9 @@ class BlockProcessor: def _abandon_claim(self, claim_hash, tx_num, nout, name) -> List['RevertableOp']: claim_from_db = False - if (tx_num, nout) in self.pending_claims: - pending = self.pending_claims.pop((tx_num, nout)) - self.staged_pending_abandoned[pending.claim_hash] = pending + if (tx_num, nout) in self.txo_to_claim: + pending = self.txo_to_claim.pop((tx_num, nout)) + self.abandoned_claims[pending.claim_hash] = pending claim_root_tx_num, claim_root_idx = pending.root_tx_num, pending.root_position prev_amount, prev_signing_hash = pending.amount, pending.signing_hash reposted_claim_hash = pending.reposted_claim_hash @@ -653,27 +653,27 @@ class BlockProcessor: prev_signing_hash = self.db.get_channel_for_claim(claim_hash, tx_num, nout) reposted_claim_hash = self.db.get_repost(claim_hash) expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num)) - self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem( + self.abandoned_claims[claim_hash] = staged = StagedClaimtrieItem( name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num, claim_root_idx, signature_is_valid, prev_signing_hash, reposted_claim_hash ) if prev_signing_hash and prev_signing_hash in self.pending_channel_counts: self.pending_channel_counts.pop(prev_signing_hash) - for support_txo_to_clear in self.pending_supports[claim_hash]: - self.pending_support_txos.pop(support_txo_to_clear) - self.pending_supports[claim_hash].clear() - self.pending_supports.pop(claim_hash) + for support_txo_to_clear in self.support_txos_by_claim[claim_hash]: + self.support_txo_to_claim.pop(support_txo_to_clear) + self.support_txos_by_claim[claim_hash].clear() + self.support_txos_by_claim.pop(claim_hash) ops = [] if staged.name.startswith('@'): # abandon a channel, invalidate signatures for k, claim_hash in self.db.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(staged.claim_hash)): - if claim_hash in self.staged_pending_abandoned: + if claim_hash in self.abandoned_claims: continue self.signatures_changed.add(claim_hash) - if claim_hash in self.pending_claim_txos: - claim = self.pending_claims[self.pending_claim_txos[claim_hash]] + if claim_hash in self.claim_hash_to_txo: + claim = self.txo_to_claim[self.claim_hash_to_txo[claim_hash]] else: claim = self.db.get_claim_txo(claim_hash) assert claim is not None @@ -705,7 +705,7 @@ class BlockProcessor: spent_claims = {} ops = [] for expired_claim_hash, (tx_num, position, name, txi) in expired.items(): - if (tx_num, position) not in self.pending_claims: + if (tx_num, position) not in self.txo_to_claim: ops.extend(self._spend_claim_txo(txi, spent_claims)) if expired: # do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned @@ -733,28 +733,28 @@ class BlockProcessor: ) def _get_pending_claim_amount(self, name: str, claim_hash: bytes, height=None) -> int: - if (name, claim_hash) in self.staged_activated_claim: - return self.staged_activated_claim[(name, claim_hash)] - if (name, claim_hash) in self.possible_future_activated_claim: - return self.possible_future_activated_claim[(name, claim_hash)] + if (name, claim_hash) in self.activated_claim_amount_by_name_and_hash: + return self.activated_claim_amount_by_name_and_hash[(name, claim_hash)] + if (name, claim_hash) in self.possible_future_claim_amount_by_name_and_hash: + return self.possible_future_claim_amount_by_name_and_hash[(name, claim_hash)] return self._cached_get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height or (self.height + 1)) def _get_pending_claim_name(self, claim_hash: bytes) -> Optional[str]: assert claim_hash is not None - if claim_hash in self.pending_claims: - return self.pending_claims[claim_hash].name + if claim_hash in self.txo_to_claim: + return self.txo_to_claim[claim_hash].name claim_info = self.db.get_claim_txo(claim_hash) if claim_info: return claim_info.name def _get_pending_supported_amount(self, claim_hash: bytes, height: Optional[int] = None) -> int: amount = self._cached_get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, height or (self.height + 1)) - if claim_hash in self.staged_activated_support: - amount += sum(self.staged_activated_support[claim_hash]) - if claim_hash in self.possible_future_activated_support: - amount += sum(self.possible_future_activated_support[claim_hash]) - if claim_hash in self.removed_active_support: - return amount - sum(self.removed_active_support[claim_hash]) + if claim_hash in self.activated_support_amount_by_claim: + amount += sum(self.activated_support_amount_by_claim[claim_hash]) + if claim_hash in self.possible_future_support_amounts_by_claim_hash: + amount += sum(self.possible_future_support_amounts_by_claim_hash[claim_hash]) + if claim_hash in self.removed_active_support_amount_by_claim: + return amount - sum(self.removed_active_support_amount_by_claim[claim_hash]) return amount def _get_pending_effective_amount(self, name: str, claim_hash: bytes, height: Optional[int] = None) -> int: @@ -815,7 +815,7 @@ class BlockProcessor: ), amount )) if is_support: - self.possible_future_support_txos[claim_hash].append((tx_num, nout)) + self.possible_future_support_txos_by_claim_hash[claim_hash].append((tx_num, nout)) return StagedActivation( ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, claim_hash, tx_num, nout, height + delay, name, amount @@ -823,7 +823,7 @@ class BlockProcessor: # determine names needing takeover/deletion due to controlling claims being abandoned # and add ops to deactivate abandoned claims - for claim_hash, staged in self.staged_pending_abandoned.items(): + for claim_hash, staged in self.abandoned_claims.items(): controlling = get_controlling(staged.name) if controlling and controlling.claim_hash == claim_hash: names_with_abandoned_controlling_claims.append(staged.name) @@ -831,7 +831,7 @@ class BlockProcessor: activation = self.db.get_activation(staged.tx_num, staged.position) if activation > 0: # db returns -1 for non-existent txos # removed queued future activation from the db - self.claimtrie_stash.extend( + self.db_op_stack.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, activation, staged.name, staged.amount @@ -843,7 +843,7 @@ class BlockProcessor: # get the removed activated supports for controlling claims to determine if takeovers are possible abandoned_support_check_need_takeover = defaultdict(list) - for claim_hash, amounts in self.removed_active_support.items(): + for claim_hash, amounts in self.removed_active_support_amount_by_claim.items(): name = self._get_pending_claim_name(claim_hash) if name is None: continue @@ -853,18 +853,18 @@ class BlockProcessor: abandoned_support_check_need_takeover[(name, claim_hash)].extend(amounts) # prepare to activate or delay activation of the pending claims being added this block - for (tx_num, nout), staged in self.pending_claims.items(): - self.claimtrie_stash.extend(get_delayed_activate_ops( + for (tx_num, nout), staged in self.txo_to_claim.items(): + self.db_op_stack.extend(get_delayed_activate_ops( staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False )) # and the supports - for (tx_num, nout), (claim_hash, amount) in self.pending_support_txos.items(): - if claim_hash in self.staged_pending_abandoned: + for (tx_num, nout), (claim_hash, amount) in self.support_txo_to_claim.items(): + if claim_hash in self.abandoned_claims: continue - elif claim_hash in self.pending_claim_txos: - name = self.pending_claims[self.pending_claim_txos[claim_hash]].name - staged_is_new_claim = not self.pending_claims[self.pending_claim_txos[claim_hash]].is_update + elif claim_hash in self.claim_hash_to_txo: + name = self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].name + staged_is_new_claim = not self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].is_update else: supported_claim_info = self.db.get_claim_txo(claim_hash) if not supported_claim_info: @@ -874,14 +874,14 @@ class BlockProcessor: v = supported_claim_info name = v.name staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position) - self.claimtrie_stash.extend(get_delayed_activate_ops( + self.db_op_stack.extend(get_delayed_activate_ops( name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True )) # add the activation/delayed-activation ops for activated, activated_txos in activated_at_height.items(): controlling = get_controlling(activated.name) - if activated.claim_hash in self.staged_pending_abandoned: + if activated.claim_hash in self.abandoned_claims: continue reactivate = False if not controlling or controlling.claim_hash == activated.claim_hash: @@ -889,24 +889,24 @@ class BlockProcessor: reactivate = True for activated_txo in activated_txos: if activated_txo.is_support and (activated_txo.tx_num, activated_txo.position) in \ - self.pending_removed_support[activated.name][activated.claim_hash]: + self.removed_support_txos_by_name_by_claim[activated.name][activated.claim_hash]: # print("\tskip activate support for pending abandoned claim") continue if activated_txo.is_claim: txo_type = ACTIVATED_CLAIM_TXO_TYPE txo_tup = (activated_txo.tx_num, activated_txo.position) - if txo_tup in self.pending_claims: - amount = self.pending_claims[txo_tup].amount + if txo_tup in self.txo_to_claim: + amount = self.txo_to_claim[txo_tup].amount else: amount = self.db.get_claim_txo_amount( activated.claim_hash ) - self.staged_activated_claim[(activated.name, activated.claim_hash)] = amount + self.activated_claim_amount_by_name_and_hash[(activated.name, activated.claim_hash)] = amount else: txo_type = ACTIVATED_SUPPORT_TXO_TYPE txo_tup = (activated_txo.tx_num, activated_txo.position) - if txo_tup in self.pending_support_txos: - amount = self.pending_support_txos[txo_tup][1] + if txo_tup in self.support_txo_to_claim: + amount = self.support_txo_to_claim[txo_tup][1] else: amount = self.db.get_support_txo_amount( activated.claim_hash, activated_txo.tx_num, activated_txo.position @@ -914,8 +914,8 @@ class BlockProcessor: if amount is None: # print("\tskip activate support for non existent claim") continue - self.staged_activated_support[activated.claim_hash].append(amount) - self.pending_activated[activated.name][activated.claim_hash].append((activated_txo, amount)) + self.activated_support_amount_by_claim[activated.claim_hash].append(amount) + self.activation_by_claim_by_name[activated.name][activated.claim_hash].append((activated_txo, amount)) # print(f"\tactivate {'support' if txo_type == ACTIVATED_SUPPORT_TXO_TYPE else 'claim'} " # f"{activated.claim_hash.hex()} @ {activated_txo.height}") @@ -928,14 +928,14 @@ class BlockProcessor: # add existing claims to the queue for the takeover # track that we need to reactivate these if one of them becomes controlling for candidate_claim_hash, (tx_num, nout) in existing.items(): - if candidate_claim_hash in self.staged_pending_abandoned: + if candidate_claim_hash in self.abandoned_claims: continue has_candidate = True existing_activation = self.db.get_activation(tx_num, nout) activate_key = PendingActivationKey( existing_activation, ACTIVATED_CLAIM_TXO_TYPE, tx_num, nout ) - self.pending_activated[need_takeover][candidate_claim_hash].append(( + self.activation_by_claim_by_name[need_takeover][candidate_claim_hash].append(( activate_key, self.db.get_claim_txo_amount(candidate_claim_hash) )) need_reactivate_if_takes_over[(need_takeover, candidate_claim_hash)] = activate_key @@ -944,7 +944,7 @@ class BlockProcessor: if not has_candidate: # remove name takeover entry, the name is now unclaimed controlling = get_controlling(need_takeover) - self.claimtrie_stash.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) + self.db_op_stack.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) # scan for possible takeovers out of the accumulated activations, of these make sure there # aren't any future activations for the taken over names with yet higher amounts, if there are @@ -961,40 +961,40 @@ class BlockProcessor: activated.name, activated.claim_hash, activated_txos[-1].height + 1 ) if activated.claim_hash not in claim_exists: - claim_exists[activated.claim_hash] = activated.claim_hash in self.pending_claim_txos or ( + claim_exists[activated.claim_hash] = activated.claim_hash in self.claim_hash_to_txo or ( self.db.get_claim_txo(activated.claim_hash) is not None) - if claim_exists[activated.claim_hash] and activated.claim_hash not in self.staged_pending_abandoned: + if claim_exists[activated.claim_hash] and activated.claim_hash not in self.abandoned_claims: v = future_amount, activated, activated_txos[-1] future_activations[activated.name][activated.claim_hash] = v for name, future_activated in activate_in_future.items(): for claim_hash, activated in future_activated.items(): if claim_hash not in claim_exists: - claim_exists[claim_hash] = claim_hash in self.pending_claim_txos or ( + claim_exists[claim_hash] = claim_hash in self.claim_hash_to_txo or ( self.db.get_claim_txo(claim_hash) is not None) if not claim_exists[claim_hash]: continue - if claim_hash in self.staged_pending_abandoned: + if claim_hash in self.abandoned_claims: continue for txo in activated: v = txo[1], PendingActivationValue(claim_hash, name), txo[0] future_activations[name][claim_hash] = v if txo[0].is_claim: - self.possible_future_activated_claim[(name, claim_hash)] = txo[1] + self.possible_future_claim_amount_by_name_and_hash[(name, claim_hash)] = txo[1] else: - self.possible_future_activated_support[claim_hash].append(txo[1]) + self.possible_future_support_amounts_by_claim_hash[claim_hash].append(txo[1]) # process takeovers checked_names = set() - for name, activated in self.pending_activated.items(): + for name, activated in self.activation_by_claim_by_name.items(): checked_names.add(name) controlling = controlling_claims[name] amounts = { claim_hash: self._get_pending_effective_amount(name, claim_hash) - for claim_hash in activated.keys() if claim_hash not in self.staged_pending_abandoned + for claim_hash in activated.keys() if claim_hash not in self.abandoned_claims } # if there is a controlling claim include it in the amounts to ensure it remains the max - if controlling and controlling.claim_hash not in self.staged_pending_abandoned: + if controlling and controlling.claim_hash not in self.abandoned_claims: amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash) winning_claim_hash = max(amounts, key=lambda x: amounts[x]) if not controlling or (winning_claim_hash != controlling.claim_hash and @@ -1018,14 +1018,14 @@ class BlockProcessor: # print(f"\ttakeover by {winning_claim_hash.hex()} triggered early activation and " # f"takeover by {winning_including_future_activations.hex()} at {height}") # handle a pending activated claim jumping the takeover delay when another name takes over - if winning_including_future_activations not in self.pending_claim_txos: + if winning_including_future_activations not in self.claim_hash_to_txo: claim = self.db.get_claim_txo(winning_including_future_activations) tx_num = claim.tx_num position = claim.position amount = claim.amount activation = self.db.get_activation(tx_num, position) else: - tx_num, position = self.pending_claim_txos[winning_including_future_activations] + tx_num, position = self.claim_hash_to_txo[winning_including_future_activations] amount = None activation = None for (k, tx_amount) in activate_in_future[name][winning_including_future_activations]: @@ -1035,13 +1035,13 @@ class BlockProcessor: break assert None not in (amount, activation) # update the claim that's activating early - self.claimtrie_stash.extend( + self.db_op_stack.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, position, activation, name, amount ).get_remove_activate_ops() ) - self.claimtrie_stash.extend( + self.db_op_stack.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, position, height, name, amount @@ -1049,21 +1049,21 @@ class BlockProcessor: ) for (k, amount) in activate_in_future[name][winning_including_future_activations]: txo = (k.tx_num, k.position) - if txo in self.possible_future_support_txos[winning_including_future_activations]: + if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]: t = ACTIVATED_SUPPORT_TXO_TYPE - self.claimtrie_stash.extend( + self.db_op_stack.extend( StagedActivation( t, winning_including_future_activations, k.tx_num, k.position, k.height, name, amount ).get_remove_activate_ops() ) - self.claimtrie_stash.extend( + self.db_op_stack.extend( StagedActivation( t, winning_including_future_activations, k.tx_num, k.position, height, name, amount ).get_activate_ops() ) - self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) + self.db_op_stack.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) elif not controlling or (winning_claim_hash != controlling.claim_hash and name in names_with_abandoned_controlling_claims) or \ ((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])): @@ -1073,27 +1073,27 @@ class BlockProcessor: amount = self.db.get_claim_txo_amount( winning_claim_hash ) - if winning_claim_hash in self.pending_claim_txos: - tx_num, position = self.pending_claim_txos[winning_claim_hash] - amount = self.pending_claims[(tx_num, position)].amount + if winning_claim_hash in self.claim_hash_to_txo: + tx_num, position = self.claim_hash_to_txo[winning_claim_hash] + amount = self.txo_to_claim[(tx_num, position)].amount else: tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position if previous_pending_activate.height > height: # the claim had a pending activation in the future, move it to now if tx_num < self.tx_count: - self.claimtrie_stash.extend( + self.db_op_stack.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, position, previous_pending_activate.height, name, amount ).get_remove_activate_ops() ) - self.claimtrie_stash.extend( + self.db_op_stack.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, position, height, name, amount ).get_activate_ops() ) - self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) + self.db_op_stack.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) elif winning_claim_hash == controlling.claim_hash: # print("\tstill winning") pass @@ -1109,22 +1109,22 @@ class BlockProcessor: controlling = get_controlling(name) amounts = { claim_hash: self._get_pending_effective_amount(name, claim_hash) - for claim_hash in self.db.get_claims_for_name(name) if claim_hash not in self.staged_pending_abandoned + for claim_hash in self.db.get_claims_for_name(name) if claim_hash not in self.abandoned_claims } - if controlling and controlling.claim_hash not in self.staged_pending_abandoned: + if controlling and controlling.claim_hash not in self.abandoned_claims: amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash) winning = max(amounts, key=lambda x: amounts[x]) if (controlling and winning != controlling.claim_hash) or (not controlling and winning): # print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") - self.claimtrie_stash.extend(get_takeover_name_ops(name, winning, height, controlling)) + self.db_op_stack.extend(get_takeover_name_ops(name, winning, height, controlling)) # gather cumulative removed/touched sets to update the search index - self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys())) + self.removed_claims_to_send_es.update(set(self.abandoned_claims.keys())) self.touched_claims_to_send_es.update( - set(self.staged_activated_support.keys()).union( - set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys()) + set(self.activated_support_amount_by_claim.keys()).union( + set(claim_hash for (_, claim_hash) in self.activated_claim_amount_by_name_and_hash.keys()) ).union(self.signatures_changed).union( - set(self.removed_active_support.keys()) + set(self.removed_active_support_amount_by_claim.keys()) ).difference(self.removed_claims_to_send_es) ) @@ -1133,37 +1133,36 @@ class BlockProcessor: removed_claim = self.db.get_claim_txo(removed) if removed_claim: amt = self.db.get_url_effective_amount( - removed_claim.name, removed_claim.tx_num, removed_claim.position, removed + removed_claim.name, removed ) - if amt and amt > 0: - self.claimtrie_stash.extend(get_remove_effective_amount_ops( - removed_claim.name, amt, removed_claim.tx_num, - removed_claim.position, removed + if amt: + self.db_op_stack.extend(get_remove_effective_amount_ops( + removed_claim.name, amt.effective_amount, amt.tx_num, + amt.position, removed )) for touched in self.touched_claims_to_send_es: - if touched in self.pending_claim_txos: - pending = self.pending_claims[self.pending_claim_txos[touched]] + if touched in self.claim_hash_to_txo: + pending = self.txo_to_claim[self.claim_hash_to_txo[touched]] name, tx_num, position = pending.name, pending.tx_num, pending.position claim_from_db = self.db.get_claim_txo(touched) if claim_from_db: - amount = self.db.get_url_effective_amount( - name, claim_from_db.tx_num, claim_from_db.position, touched - ) - if amount and amount > 0: - self.claimtrie_stash.extend(get_remove_effective_amount_ops( - name, amount, claim_from_db.tx_num, claim_from_db.position, touched + claim_amount_info = self.db.get_url_effective_amount(name, touched) + if claim_amount_info: + self.db_op_stack.extend(get_remove_effective_amount_ops( + name, claim_amount_info.effective_amount, claim_amount_info.tx_num, + claim_amount_info.position, touched )) else: v = self.db.get_claim_txo(touched) if not v: continue name, tx_num, position = v.name, v.tx_num, v.position - amt = self.db.get_url_effective_amount(name, tx_num, position, touched) - if amt and amt > 0: - self.claimtrie_stash.extend(get_remove_effective_amount_ops( - name, amt, tx_num, position, touched + amt = self.db.get_url_effective_amount(name, touched) + if amt: + self.db_op_stack.extend(get_remove_effective_amount_ops( + name, amt.effective_amount, amt.tx_num, amt.position, touched )) - self.claimtrie_stash.extend( + self.db_op_stack.extend( get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched), tx_num, position, touched) ) @@ -1184,7 +1183,7 @@ class BlockProcessor: # Use local vars for speed in the loops put_utxo = self.utxo_cache.__setitem__ - claimtrie_stash_extend = self.claimtrie_stash.extend + claimtrie_stash_extend = self.db_op_stack.extend spend_utxo = self.spend_utxo undo_info_append = undo_info.append update_touched = self.touched.update @@ -1261,7 +1260,7 @@ class BlockProcessor: self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) - undo_claims = b''.join(op.invert().pack() for op in self.claimtrie_stash) + undo_claims = b''.join(op.invert().pack() for op in self.db_op_stack) # print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash))) if height >= self.daemon.cached_height() - self.env.reorg_limit: @@ -1275,20 +1274,20 @@ class BlockProcessor: self.db.flush_dbs(self.flush_data()) - self.claimtrie_stash.clear() - self.pending_claims.clear() - self.pending_claim_txos.clear() - self.pending_supports.clear() - self.pending_support_txos.clear() - self.pending_removed_support.clear() - self.staged_pending_abandoned.clear() - self.removed_active_support.clear() - self.staged_activated_support.clear() - self.staged_activated_claim.clear() - self.pending_activated.clear() - self.possible_future_activated_claim.clear() - self.possible_future_activated_support.clear() - self.possible_future_support_txos.clear() + self.db_op_stack.clear() + self.txo_to_claim.clear() + self.claim_hash_to_txo.clear() + self.support_txos_by_claim.clear() + self.support_txo_to_claim.clear() + self.removed_support_txos_by_name_by_claim.clear() + self.abandoned_claims.clear() + self.removed_active_support_amount_by_claim.clear() + self.activated_support_amount_by_claim.clear() + self.activated_claim_amount_by_name_and_hash.clear() + self.activation_by_claim_by_name.clear() + self.possible_future_claim_amount_by_name_and_hash.clear() + self.possible_future_support_amounts_by_claim_hash.clear() + self.possible_future_support_txos_by_claim_hash.clear() self.pending_channels.clear() self.amount_cache.clear() self.signatures_changed.clear() @@ -1523,7 +1522,7 @@ class BlockProcessor: self._caught_up_event = caught_up_event try: await self.db.open_dbs() - self.claimtrie_stash = RevertableOpStack(self.db.db.get) + self.db_op_stack = RevertableOpStack(self.db.db.get) self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 532e78ecd..4fed8238f 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -18,6 +18,10 @@ class RevertableOp: self.key = key self.value = value + @property + def is_delete(self) -> bool: + return not self.is_put + def invert(self) -> 'RevertableOp': raise NotImplementedError() @@ -26,7 +30,7 @@ class RevertableOp: Serialize to bytes """ return struct.pack( - f'>BHH{len(self.key)}s{len(self.value)}s', self.is_put, len(self.key), len(self.value), self.key, + f'>BHH{len(self.key)}s{len(self.value)}s', int(self.is_put), len(self.key), len(self.value), self.key, self.value ) @@ -76,12 +80,16 @@ class RevertableDelete(RevertableOp): class RevertablePut(RevertableOp): - is_put = 1 + is_put = True def invert(self): return RevertableDelete(self.key, self.value) +class OpStackIntegrity(Exception): + pass + + class RevertableOpStack: def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]): self._get = get_fn @@ -90,24 +98,27 @@ class RevertableOpStack: def append(self, op: RevertableOp): inverted = op.invert() if self._items[op.key] and inverted == self._items[op.key][-1]: - self._items[op.key].pop() + self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both + return elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op - pass # raise an error? - else: - if op.is_put: - stored = self._get(op.key) - if stored is not None: - assert RevertableDelete(op.key, stored) in self._items[op.key], \ - f"db op tries to add on top of existing key: {op}" - self._items[op.key].append(op) - else: - stored = self._get(op.key) - if stored is not None and stored != op.value: - assert RevertableDelete(op.key, stored) in self._items[op.key], f"delete {op}" - else: - assert stored is not None, f"db op tries to delete nonexistent key: {op}" - assert stored == op.value, f"db op tries to delete with incorrect value: {op}" - self._items[op.key].append(op) + return # raise an error? + stored_val = self._get(op.key) + has_stored_val = stored_val is not None + delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) + will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) + if op.is_put and has_stored_val and not will_delete_existing_stored: + raise OpStackIntegrity( + f"db op tries to add on top of existing key without deleting first: {op}" + ) + elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: + # there is a value and we're not deleting it in this op + # check that a delete for the stored value is in the stack + raise OpStackIntegrity(f"delete {op}") + elif op.is_delete and not has_stored_val: + raise OpStackIntegrity("db op tries to delete nonexistent key: {op}") + elif op.is_delete and stored_val != op.value: + raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") + self._items[op.key].append(op) def extend(self, ops: Iterable[RevertableOp]): for op in ops: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index aba71a8fc..4b583c163 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -394,14 +394,11 @@ class LevelDB: return support_only return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) - def get_url_effective_amount(self, name: str, tx_num: int, position: int, claim_hash: bytes): + def get_url_effective_amount(self, name: str, claim_hash: bytes): for _k, _v in self.db.iterator(prefix=Prefixes.effective_amount.pack_partial_key(name)): v = Prefixes.effective_amount.unpack_value(_v) if v.claim_hash == claim_hash: - k = Prefixes.effective_amount.unpack_key(_k) - if k.tx_num == tx_num and k.position == position: - return k.effective_amount - return + return Prefixes.effective_amount.unpack_key(_k) def get_claims_for_name(self, name): claims = [] @@ -654,7 +651,6 @@ class LevelDB: k = Prefixes.pending_activation.unpack_key(_k) v = Prefixes.pending_activation.unpack_value(_v) activated[v].append(k) - return activated async def _read_tx_counts(self): @@ -992,7 +988,7 @@ class LevelDB: batch_delete(op.key) flush_data.undo.clear() - flush_data.claimtrie_stash.clear() + flush_data.db_op_stack.clear() while self.fs_height > flush_data.height: self.fs_height -= 1