diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 10a15830c..af71e386e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -206,6 +206,10 @@ class BlockProcessor: self.history_cache = {} self.status_server = StatusServer() + ################################# + # attributes used for calculating stake activations and takeovers per block + ################################# + # txo to pending claim self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} # claim hash to pending claim txo @@ -572,10 +576,10 @@ class BlockProcessor: ops.extend(self._abandon(spent_claims)) return ops - def _get_pending_claim_amount(self, name: str, claim_hash: bytes) -> int: + def _get_pending_claim_amount(self, name: str, claim_hash: bytes, height=None) -> int: if (name, claim_hash) in self.staged_activated_claim: return self.staged_activated_claim[(name, claim_hash)] - return self.db._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.height + 1) + return self.db._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height or (self.height + 1)) def _get_pending_claim_name(self, claim_hash: bytes) -> Optional[str]: assert claim_hash is not None @@ -585,16 +589,16 @@ class BlockProcessor: if claim_info: return claim_info[1].name - def _get_pending_supported_amount(self, claim_hash: bytes) -> int: - amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.height + 1) or 0 + def _get_pending_supported_amount(self, claim_hash: bytes, height: Optional[int] = None) -> int: + amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, height or (self.height + 1)) or 0 if claim_hash in self.staged_activated_support: amount += sum(self.staged_activated_support[claim_hash]) if claim_hash in self.removed_active_support: return amount - sum(self.removed_active_support[claim_hash]) return amount - def _get_pending_effective_amount(self, name: str, claim_hash: bytes) -> int: - claim_amount = self._get_pending_claim_amount(name, claim_hash) + def _get_pending_effective_amount(self, name: str, claim_hash: bytes, height: Optional[int] = None) -> int: + claim_amount = self._get_pending_claim_amount(name, claim_hash, height=height) support_amount = self._get_pending_supported_amount(claim_hash) return claim_amount + support_amount @@ -611,6 +615,36 @@ class BlockProcessor: _controlling = controlling_claims[_name] return _controlling + def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int, + amount: int, is_support: bool) -> List['RevertableOp']: + controlling = get_controlling(name) + nothing_is_controlling = not controlling + staged_is_controlling = False if not controlling else claim_hash == controlling.claim_hash + controlling_is_abandoned = False if not controlling else \ + controlling.claim_hash in names_with_abandoned_controlling_claims + + if nothing_is_controlling or staged_is_controlling or controlling_is_abandoned: + delay = 0 + elif is_new_claim: + delay = self.coin.get_delay_for_name(height - controlling.height) + else: + controlling_effective_amount = self._get_pending_effective_amount(name, controlling.claim_hash) + staged_effective_amount = self._get_pending_effective_amount(name, claim_hash) + staged_update_could_cause_takeover = staged_effective_amount > controlling_effective_amount + delay = 0 if not staged_update_could_cause_takeover else self.coin.get_delay_for_name( + height - controlling.height + ) + if delay == 0: # if delay was 0 it needs to be considered for takeovers + activated_at_height[PendingActivationValue(claim_hash, name)].append( + PendingActivationKey( + height, ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, nout + ) + ) + return StagedActivation( + ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, claim_hash, tx_num, nout, + height + delay, name, amount + ).get_activate_ops() + ops = [] names_with_abandoned_controlling_claims: List[str] = [] @@ -648,28 +682,8 @@ class BlockProcessor: # prepare to activate or delay activation of the pending claims being added this block for (tx_num, nout), staged in self.pending_claims.items(): - controlling = get_controlling(staged.name) - delay = 0 - if not controlling or staged.claim_hash == controlling.claim_hash or \ - controlling.claim_hash in names_with_abandoned_controlling_claims: - pass - else: - controlling_effective_amount = self._get_pending_effective_amount(staged.name, controlling.claim_hash) - amount = self._get_pending_effective_amount(staged.name, staged.claim_hash) - delay = 0 - # if this is an OP_CLAIM or the amount appears to trigger a takeover, delay - if not staged.is_update or (amount > controlling_effective_amount): - delay = self.coin.get_delay_for_name(height - controlling.height) - ops.extend( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, - height + delay, staged.name, staged.amount - ).get_activate_ops() - ) - if delay == 0: # if delay was 0 it needs to be considered for takeovers - activated_at_height[PendingActivationValue(staged.claim_hash, staged.name)].append( - PendingActivationKey(height, ACTIVATED_CLAIM_TXO_TYPE, tx_num, nout) - ) + ops.extend(get_delayed_activate_ops(staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, + staged.amount, is_support=False)) # and the supports for (tx_num, nout), (claim_hash, amount) in self.pending_support_txos.items(): @@ -677,29 +691,13 @@ class BlockProcessor: continue elif claim_hash in self.pending_claim_txos: name = self.pending_claims[self.pending_claim_txos[claim_hash]].name - is_update = self.pending_claims[self.pending_claim_txos[claim_hash]].is_update + staged_is_new_claim = not self.pending_claims[self.pending_claim_txos[claim_hash]].is_update else: k, v = self.db.get_claim_txo(claim_hash) name = v.name - is_update = (v.root_tx_num, v.root_position) != (k.tx_num, k.position) - - controlling = get_controlling(name) - delay = 0 - if not controlling or claim_hash == controlling.claim_hash: - pass - elif not is_update or self._get_pending_effective_amount(staged.name, - claim_hash) > self._get_pending_effective_amount(staged.name, controlling.claim_hash): - delay = self.coin.get_delay_for_name(height - controlling.height) - if delay == 0: - activated_at_height[PendingActivationValue(claim_hash, name)].append( - PendingActivationKey(height + delay, ACTIVATED_SUPPORT_TXO_TYPE, tx_num, nout) - ) - ops.extend( - StagedActivation( - ACTIVATED_SUPPORT_TXO_TYPE, claim_hash, tx_num, nout, - height + delay, name, amount - ).get_activate_ops() - ) + staged_is_new_claim = (v.root_tx_num, v.root_position) == (k.tx_num, k.position) + ops.extend(get_delayed_activate_ops(name, claim_hash, staged_is_new_claim, tx_num, nout, amount, + is_support=True)) # add the activation/delayed-activation ops for activated, activated_txos in activated_at_height.items(): @@ -774,57 +772,103 @@ class BlockProcessor: controlling = get_controlling(need_takeover) ops.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) - # process takeovers from the combined newly added and previously scheduled claims + # scan for possible takeovers out of the accumulated activations, of these make sure there + # aren't any future activations for the taken over names with yet higher amounts, if there are + # these need to get activated now and take over instead. for example: + # claim A is winning for 0.1 for long enough for a > 1 takeover delay + # claim B is made for 0.2 + # a block later, claim C is made for 0.3, it will schedule to activate 1 (or rarely 2) block(s) after B + # upon the delayed activation of B, we need to detect to activate C and make it take over early instead + future_activations = defaultdict(dict) + for activated, activated_txos in self.db.get_future_activated(height).items(): + # uses the pending effective amount for the future activation height, not the current height + future_effective_amount = self._get_pending_effective_amount( + activated.name, activated.claim_hash, activated_txos[-1].height + 1 + ) + v = future_effective_amount, activated, activated_txos[-1] + future_activations[activated.name][activated.claim_hash] = v + + # process takeovers checked_names = set() for name, activated in self.pending_activated.items(): checked_names.add(name) - if name in names_with_abandoned_controlling_claims: - print(f'\tabandoned {name} need takeover') controlling = controlling_claims[name] amounts = { claim_hash: self._get_pending_effective_amount(name, claim_hash) for claim_hash in activated.keys() if claim_hash not in self.staged_pending_abandoned } + # if there is a controlling claim include it in the amounts to ensure it remains the max if controlling and controlling.claim_hash not in self.staged_pending_abandoned: amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash) - winning = max(amounts, key=lambda x: amounts[x]) - if not controlling or (winning != controlling.claim_hash and + winning_claim_hash = max(amounts, key=lambda x: amounts[x]) + if not controlling or (winning_claim_hash != controlling.claim_hash and name in names_with_abandoned_controlling_claims) or \ - ((winning != controlling.claim_hash) and (amounts[winning] > amounts[controlling.claim_hash])): - if (name, winning) in need_reactivate_if_takes_over: - previous_pending_activate = need_reactivate_if_takes_over[(name, winning)] - amount = self.db.get_claim_txo_amount( - winning, previous_pending_activate.tx_num, previous_pending_activate.position + ((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])): + amounts_with_future_activations = {claim_hash: amount for claim_hash, amount in amounts.items()} + amounts_with_future_activations.update( + { + claim_hash: effective_amount + for claim_hash, (effective_amount, _, _) in future_activations[name].items() + } + ) + winning_including_future_activations = max( + amounts_with_future_activations, key=lambda x: amounts_with_future_activations[x] + ) + if winning_claim_hash != winning_including_future_activations: + print(f"\ttakeover of {name} by {winning_claim_hash.hex()} triggered early activation and " + f"takeover by {winning_including_future_activations.hex()} at {height}") + _, v, k = future_activations[name][winning_including_future_activations] + amount = self._get_pending_claim_amount( + name, winning_including_future_activations, k.height + 1 ) - if winning in self.pending_claim_txos: - tx_num, position = self.pending_claim_txos[winning] - amount = self.pending_claims[(tx_num, position)].amount - else: - tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position - if previous_pending_activate.height > height: - # the claim had a pending activation in the future, move it to now - ops.extend( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning, tx_num, - position, previous_pending_activate.height, name, amount - ).get_remove_activate_ops() + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, k.tx_num, + k.position, k.height, name, amount + ).get_remove_activate_ops() + ) + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, k.tx_num, + k.position, height, name, amount + ).get_activate_ops() + ) + ops.extend(get_takeover_name_ops(name, winning_including_future_activations, height)) + elif not controlling or (winning_claim_hash != controlling.claim_hash and + name in names_with_abandoned_controlling_claims) or \ + ((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])): + print(f"\ttakeover {name} by {winning_claim_hash.hex()} at {height}") + if (name, winning_claim_hash) in need_reactivate_if_takes_over: + previous_pending_activate = need_reactivate_if_takes_over[(name, winning_claim_hash)] + amount = self.db.get_claim_txo_amount( + winning_claim_hash, previous_pending_activate.tx_num, previous_pending_activate.position ) - ops.extend( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning, tx_num, - position, height, name, amount - ).get_activate_ops() - ) - ops.extend(get_takeover_name_ops(name, winning, height)) + if winning_claim_hash in self.pending_claim_txos: + tx_num, position = self.pending_claim_txos[winning_claim_hash] + amount = self.pending_claims[(tx_num, position)].amount + else: + tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position + if previous_pending_activate.height > height: + # the claim had a pending activation in the future, move it to now + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, + position, previous_pending_activate.height, name, amount + ).get_remove_activate_ops() + ) + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, + position, height, name, amount + ).get_activate_ops() + ) + ops.extend(get_takeover_name_ops(name, winning_claim_hash, height)) + elif winning_claim_hash == controlling.claim_hash: + print("\tstill winning") + pass else: - ops.extend(get_takeover_name_ops(name, winning, height)) - - elif winning == controlling.claim_hash: - print("\tstill winning") - pass - else: - print("\tno takeover") - pass + print("\tno takeover") + pass # handle remaining takeovers from abandoned supports for (name, claim_hash), amounts in abandoned_support_check_need_takeover.items(): @@ -871,9 +915,6 @@ class BlockProcessor: append_hashX_by_tx = hashXs_by_tx.append hashX_from_script = self.coin.hashX_from_script - zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]] = {} - abandoned_or_expired_controlling = set() - for tx, tx_hash in txs: spent_claims = {} diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 70f377d9f..42bcb9077 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -5,11 +5,6 @@ from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue - - - - - def length_encoded_name(name: str) -> bytes: encoded = name.encode('utf-8') return len(encoded).to_bytes(2, byteorder='big') + encoded @@ -96,7 +91,6 @@ def get_remove_name_ops(name: str, claim_hash: bytes, height: int) -> typing.Lis def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int, previous_winning: Optional[ClaimTakeoverValue] = None): if previous_winning: - # print(f"takeover previously owned {name} - {claim_hash.hex()} at {takeover_height}") return [ RevertableDelete( *Prefixes.claim_takeover.pack_item( @@ -109,7 +103,6 @@ def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int, ) ) ] - # print(f"takeover {name} - {claim_hash[::-1].hex()} at {takeover_height}") return [ RevertablePut( *Prefixes.claim_takeover.pack_item( diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 54987335f..ecdfe5964 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -464,6 +464,16 @@ class LevelDB: activated[v].append(k) return activated + def get_future_activated(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: + activated = defaultdict(list) + for i in range(self.coin.maxTakeoverDelay): + prefix = Prefixes.pending_activation.pack_partial_key(height+1+i, ACTIVATED_CLAIM_TXO_TYPE) + for _k, _v in self.db.iterator(prefix=prefix): + k = Prefixes.pending_activation.unpack_key(_k) + v = Prefixes.pending_activation.unpack_value(_v) + activated[v].append(k) + return activated + async def _read_tx_counts(self): if self.tx_counts is not None: return diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index bbfcee30a..ccc1ac060 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -434,6 +434,27 @@ class ResolveClaimTakeovers(BaseResolveTestCase): await self.assertNoClaimForName(name) await self._test_activation_delay() + async def test_early_takeover(self): + name = 'derp' + # block 207 + first_claim_id = (await self.stream_create(name, '0.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + + await self.generate(96) + # block 304, activates at 307 + second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] + # block 305, activates at 308 (but gets triggered early by the takeover by the second claim) + third_claim_id = (await self.stream_create(name, '0.3', allow_duplicate_name=True))['outputs'][0]['claim_id'] + self.assertNotEqual(first_claim_id, second_claim_id) + # takeover should not have happened yet + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, third_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, third_claim_id) + async def test_block_takeover_with_delay_1_support(self): name = 'derp' # initially claim the name