From b6e4cb9102e5b351ba7bcdc5cc269b0fb7e4e526 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 24 May 2021 12:41:44 -0400 Subject: [PATCH] comments --- lbry/wallet/server/block_processor.py | 79 ++++++++++++++++----------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index fff008dba..10a15830c 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -206,18 +206,28 @@ class BlockProcessor: self.history_cache = {} self.status_server = StatusServer() + # txo to pending claim self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} + # claim hash to pending claim txo self.pending_claim_txos: typing.Dict[bytes, Tuple[int, int]] = {} - self.pending_supports = defaultdict(list) - - self.pending_support_txos = {} - - self.pending_removed_support = defaultdict(lambda: defaultdict(list)) + # claim hash to lists of pending support txos + self.pending_supports: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) + # support txo: (supported claim hash, support amount) + self.pending_support_txos: Dict[Tuple[int, int], Tuple[bytes, int]] = {} + # removed supports {name: {claim_hash: [(tx_num, nout), ...]}} + self.pending_removed_support: DefaultDict[str, DefaultDict[bytes, List[Tuple[int, int]]]] = defaultdict( + lambda: defaultdict(list)) self.staged_pending_abandoned: Dict[bytes, StagedClaimtrieItem] = {} - self.removed_active_support = defaultdict(list) - self.staged_activated_support = defaultdict(list) - self.staged_activated_claim = {} - self.pending_activated = defaultdict(lambda: defaultdict(list)) + # removed activated support amounts by claim hash + self.removed_active_support: DefaultDict[bytes, List[int]] = defaultdict(list) + # pending activated support amounts by claim hash + self.staged_activated_support: DefaultDict[bytes, List[int]] = defaultdict(list) + # pending activated name and claim hash to claim/update txo amount + self.staged_activated_claim: Dict[Tuple[str, bytes], int] = {} + # pending claim and support activations per claim hash per name, + # used to process takeovers due to added activations + self.pending_activated: DefaultDict[str, DefaultDict[bytes, List[Tuple[PendingActivationKey, int]]]] = \ + defaultdict(lambda: defaultdict(list)) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -482,10 +492,11 @@ class BlockProcessor: print(f"\tspent support for lbry://{supported_name}#{spent_support.hex()} activation:{activation} {support_amount}") return StagedClaimtrieSupport( spent_support, txin_num, txin.prev_idx, support_amount - ).get_spend_support_txo_ops() + StagedActivation( - ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, - support_amount - ).get_remove_activate_ops() + ).get_spend_support_txo_ops() + \ + StagedActivation( + ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, + support_amount + ).get_remove_activate_ops() return [] def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]): @@ -575,10 +586,9 @@ class BlockProcessor: return claim_info[1].name def _get_pending_supported_amount(self, claim_hash: bytes) -> int: - support_amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.height + 1) or 0 - amount = support_amount + sum( - self.staged_activated_support.get(claim_hash, []) - ) + amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.height + 1) or 0 + if claim_hash in self.staged_activated_support: + amount += sum(self.staged_activated_support[claim_hash]) if claim_hash in self.removed_active_support: return amount - sum(self.removed_active_support[claim_hash]) return amount @@ -589,13 +599,9 @@ class BlockProcessor: return claim_amount + support_amount def _get_takeover_ops(self, height: int) -> List['RevertableOp']: - ops = [] - # get takeovers from claims activated at this block - activated_at_height = self.db.get_activated_at_height(height) + # cache for controlling claims as of the previous block controlling_claims = {} - abandoned_need_takeover = [] - abandoned_support_check_need_takeover = defaultdict(list) def get_controlling(_name): if _name not in controlling_claims: @@ -605,15 +611,21 @@ class BlockProcessor: _controlling = controlling_claims[_name] return _controlling + ops = [] + names_with_abandoned_controlling_claims: List[str] = [] + + # get the claims and supports previously scheduled to be activated at this block + activated_at_height = self.db.get_activated_at_height(height) + # determine names needing takeover/deletion due to controlling claims being abandoned # and add ops to deactivate abandoned claims for claim_hash, staged in self.staged_pending_abandoned.items(): controlling = get_controlling(staged.name) if controlling and controlling.claim_hash == claim_hash: - abandoned_need_takeover.append(staged.name) + names_with_abandoned_controlling_claims.append(staged.name) print(f"\t{staged.name} needs takeover") activation = self.db.get_activation(staged.tx_num, staged.position) - if activation > 0: + if activation > 0: # db returns -1 for non-existent txos # removed queued future activation from the db ops.extend( StagedActivation( @@ -622,14 +634,16 @@ class BlockProcessor: ).get_remove_activate_ops() ) else: - # it hadn't yet been activated, db returns -1 for non-existent txos + # it hadn't yet been activated pass - # build set of controlling claims that had activated supports spent to check them for takeovers later + # get the removed activated supports for controlling claims to determine if takeovers are possible + abandoned_support_check_need_takeover = defaultdict(list) for claim_hash, amounts in self.removed_active_support.items(): name = self._get_pending_claim_name(claim_hash) controlling = get_controlling(name) - if controlling and controlling.claim_hash == claim_hash and name not in abandoned_need_takeover: + if controlling and controlling.claim_hash == claim_hash and \ + name not in names_with_abandoned_controlling_claims: abandoned_support_check_need_takeover[(name, claim_hash)].extend(amounts) # prepare to activate or delay activation of the pending claims being added this block @@ -637,7 +651,7 @@ class BlockProcessor: controlling = get_controlling(staged.name) delay = 0 if not controlling or staged.claim_hash == controlling.claim_hash or \ - controlling.claim_hash in abandoned_need_takeover: + controlling.claim_hash in names_with_abandoned_controlling_claims: pass else: controlling_effective_amount = self._get_pending_effective_amount(staged.name, controlling.claim_hash) @@ -736,7 +750,7 @@ class BlockProcessor: # go through claims where the controlling claim or supports to the controlling claim have been abandoned # check if takeovers are needed or if the name node is now empty need_reactivate_if_takes_over = {} - for need_takeover in abandoned_need_takeover: + for need_takeover in names_with_abandoned_controlling_claims: existing = self.db.get_claim_txos_for_name(need_takeover) has_candidate = False # add existing claims to the queue for the takeover @@ -764,7 +778,7 @@ class BlockProcessor: checked_names = set() for name, activated in self.pending_activated.items(): checked_names.add(name) - if name in abandoned_need_takeover: + if name in names_with_abandoned_controlling_claims: print(f'\tabandoned {name} need takeover') controlling = controlling_claims[name] amounts = { @@ -774,8 +788,9 @@ class BlockProcessor: if controlling and controlling.claim_hash not in self.staged_pending_abandoned: amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash) winning = max(amounts, key=lambda x: amounts[x]) - if not controlling or (winning != controlling.claim_hash and name in abandoned_need_takeover) or ((winning != controlling.claim_hash) and - (amounts[winning] > amounts[controlling.claim_hash])): + if not controlling or (winning != controlling.claim_hash and + name in names_with_abandoned_controlling_claims) or \ + ((winning != controlling.claim_hash) and (amounts[winning] > amounts[controlling.claim_hash])): if (name, winning) in need_reactivate_if_takes_over: previous_pending_activate = need_reactivate_if_takes_over[(name, winning)] amount = self.db.get_claim_txo_amount(