diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 2f023d30f..abe6efbd2 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -232,6 +232,10 @@ class BlockProcessor: # used to process takeovers due to added activations self.pending_activated: DefaultDict[str, DefaultDict[bytes, List[Tuple[PendingActivationKey, int]]]] = \ defaultdict(lambda: defaultdict(list)) + # these are used for detecting early takeovers by not yet activated claims/supports + self.possible_future_activated_support: DefaultDict[bytes, List[int]] = defaultdict(list) + self.possible_future_activated_claim: Dict[Tuple[str, bytes], int] = {} + self.possible_future_support_txos: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -579,6 +583,8 @@ class BlockProcessor: def _get_pending_claim_amount(self, name: str, claim_hash: bytes, height=None) -> int: if (name, claim_hash) in self.staged_activated_claim: return self.staged_activated_claim[(name, claim_hash)] + if (name, claim_hash) in self.possible_future_activated_claim: + return self.possible_future_activated_claim[(name, claim_hash)] return self.db._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height or (self.height + 1)) def _get_pending_claim_name(self, claim_hash: bytes) -> Optional[str]: @@ -593,13 +599,15 @@ class BlockProcessor: amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, height or (self.height + 1)) or 0 if claim_hash in self.staged_activated_support: amount += sum(self.staged_activated_support[claim_hash]) + if claim_hash in self.possible_future_activated_support: + amount += sum(self.possible_future_activated_support[claim_hash]) if claim_hash in self.removed_active_support: return amount - sum(self.removed_active_support[claim_hash]) return amount def _get_pending_effective_amount(self, name: str, claim_hash: bytes, height: Optional[int] = None) -> int: claim_amount = self._get_pending_claim_amount(name, claim_hash, height=height) - support_amount = self._get_pending_supported_amount(claim_hash) + support_amount = self._get_pending_supported_amount(claim_hash, height=height) return claim_amount + support_amount def _get_takeover_ops(self, height: int) -> List['RevertableOp']: @@ -615,6 +623,14 @@ class BlockProcessor: _controlling = controlling_claims[_name] return _controlling + ops = [] + names_with_abandoned_controlling_claims: List[str] = [] + + # get the claims and supports previously scheduled to be activated at this block + activated_at_height = self.db.get_activated_at_height(height) + activate_in_future = defaultdict(lambda: defaultdict(list)) + future_activations = defaultdict(dict) + def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int, amount: int, is_support: bool) -> List['RevertableOp']: controlling = get_controlling(name) @@ -640,17 +656,20 @@ class BlockProcessor: height, ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, nout ) ) + else: # if the delay was higher if still needs to be considered if something else triggers a takeover + activate_in_future[name][claim_hash].append(( + PendingActivationKey( + height + delay, ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, + tx_num, nout + ), amount + )) + if is_support: + self.possible_future_support_txos[claim_hash].append((tx_num, nout)) return StagedActivation( ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, claim_hash, tx_num, nout, height + delay, name, amount ).get_activate_ops() - ops = [] - names_with_abandoned_controlling_claims: List[str] = [] - - # get the claims and supports previously scheduled to be activated at this block - activated_at_height = self.db.get_activated_at_height(height) - # determine names needing takeover/deletion due to controlling claims being abandoned # and add ops to deactivate abandoned claims for claim_hash, staged in self.staged_pending_abandoned.items(): @@ -682,8 +701,9 @@ class BlockProcessor: # prepare to activate or delay activation of the pending claims being added this block for (tx_num, nout), staged in self.pending_claims.items(): - ops.extend(get_delayed_activate_ops(staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, - staged.amount, is_support=False)) + ops.extend(get_delayed_activate_ops( + staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False + )) # and the supports for (tx_num, nout), (claim_hash, amount) in self.pending_support_txos.items(): @@ -696,8 +716,9 @@ class BlockProcessor: k, v = self.db.get_claim_txo(claim_hash) name = v.name staged_is_new_claim = (v.root_tx_num, v.root_position) == (k.tx_num, k.position) - ops.extend(get_delayed_activate_ops(name, claim_hash, staged_is_new_claim, tx_num, nout, amount, - is_support=True)) + ops.extend(get_delayed_activate_ops( + name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True + )) # add the activation/delayed-activation ops for activated, activated_txos in activated_at_height.items(): @@ -779,15 +800,24 @@ class BlockProcessor: # claim B is made for 0.2 # a block later, claim C is made for 0.3, it will schedule to activate 1 (or rarely 2) block(s) after B # upon the delayed activation of B, we need to detect to activate C and make it take over early instead - future_activations = defaultdict(dict) for activated, activated_txos in self.db.get_future_activated(height).items(): # uses the pending effective amount for the future activation height, not the current height - future_effective_amount = self._get_pending_effective_amount( + future_amount = self._get_pending_claim_amount( activated.name, activated.claim_hash, activated_txos[-1].height + 1 ) - v = future_effective_amount, activated, activated_txos[-1] + v = future_amount, activated, activated_txos[-1] future_activations[activated.name][activated.claim_hash] = v + for name, future_activated in activate_in_future.items(): + for claim_hash, activated in future_activated.items(): + for txo in activated: + v = txo[1], PendingActivationValue(claim_hash, name), txo[0] + future_activations[name][claim_hash] = v + if v[2].is_claim: + self.possible_future_activated_claim[(name, claim_hash)] = v[0] + else: + self.possible_future_activated_support[claim_hash].append(v[0]) + # process takeovers checked_names = set() for name, activated in self.pending_activated.items(): @@ -807,32 +837,68 @@ class BlockProcessor: amounts_with_future_activations = {claim_hash: amount for claim_hash, amount in amounts.items()} amounts_with_future_activations.update( { - claim_hash: effective_amount - for claim_hash, (effective_amount, _, _) in future_activations[name].items() + claim_hash: self._get_pending_effective_amount( + name, claim_hash, self.height + 1 + self.coin.maxTakeoverDelay + ) for claim_hash in future_activations[name] } ) winning_including_future_activations = max( amounts_with_future_activations, key=lambda x: amounts_with_future_activations[x] ) + print(amounts_with_future_activations) + print(amounts) if winning_claim_hash != winning_including_future_activations: print(f"\ttakeover of {name} by {winning_claim_hash.hex()} triggered early activation and " f"takeover by {winning_including_future_activations.hex()} at {height}") - _, v, k = future_activations[name][winning_including_future_activations] - amount = self._get_pending_claim_amount( - name, winning_including_future_activations, k.height + 1 - ) + # handle a pending activated claim jumping the takeover delay when another name takes over + if winning_including_future_activations not in self.pending_claim_txos: + claim = self.db.get_claim_txo(winning_including_future_activations) + tx_num = claim[0].tx_num + position = claim[0].position + amount = claim[1].amount + activation = self.db.get_activation(tx_num, position) + + else: + tx_num, position = self.pending_claim_txos[winning_including_future_activations] + amount = None + activation = None + for (k, tx_amount) in activate_in_future[name][winning_including_future_activations]: + if (k.tx_num, k.position) == (tx_num, position): + amount = tx_amount + activation = k.height + assert None not in (amount, activation) ops.extend( StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, k.tx_num, - k.position, k.height, name, amount + ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, + position, activation, name, amount ).get_remove_activate_ops() ) ops.extend( StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, k.tx_num, - k.position, height, name, amount + ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, + position, height, name, amount ).get_activate_ops() ) + + for (k, amount) in activate_in_future[name][winning_including_future_activations]: + txo = (k.tx_num, k.position) + if txo in self.possible_future_support_txos[winning_including_future_activations]: + t = ACTIVATED_SUPPORT_TXO_TYPE + else: + t = ACTIVATED_CLAIM_TXO_TYPE + ops.extend( + StagedActivation( + t, winning_including_future_activations, k.tx_num, + k.position, k.height, name, amount + ).get_remove_activate_ops() + ) + ops.extend( + StagedActivation( + t, winning_including_future_activations, k.tx_num, + k.position, height, name, amount + ).get_activate_ops() + ) + ops.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) elif not controlling or (winning_claim_hash != controlling.claim_hash and name in names_with_abandoned_controlling_claims) or \ @@ -875,9 +941,7 @@ class BlockProcessor: if name in checked_names: continue checked_names.add(name) - controlling = get_controlling(name) - amounts = { claim_hash: self._get_pending_effective_amount(name, claim_hash) for claim_hash in self.db.get_claims_for_name(name) if claim_hash not in self.staged_pending_abandoned @@ -1012,6 +1076,9 @@ class BlockProcessor: self.staged_activated_support.clear() self.staged_activated_claim.clear() self.pending_activated.clear() + self.possible_future_activated_claim.clear() + self.possible_future_activated_support.clear() + self.possible_future_support_txos.clear() for cache in self.search_cache.values(): cache.clear() diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 481e2178c..9493d0054 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -3,6 +3,7 @@ from typing import Optional from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue +from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE def length_encoded_name(name: str) -> bytes: @@ -51,7 +52,9 @@ class StagedActivation(typing.NamedTuple): def _get_add_remove_activate_ops(self, add=True): op = RevertablePut if add else RevertableDelete - print(f"\t{'add' if add else 'remove'} {self.txo_type}, {self.tx_num}, {self.position}, activation={self.activation_height}, {self.name}") + print(f"\t{'add' if add else 'remove'} {'claim' if self.txo_type == ACTIVATED_CLAIM_TXO_TYPE else 'support'}," + f" {self.tx_num}, {self.position}, activation={self.activation_height}, {self.name}, " + f"amount={self.amount}") return [ op( *Prefixes.activated.pack_item( diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index ecdfe5964..2c0740ada 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -467,7 +467,7 @@ class LevelDB: def get_future_activated(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list) for i in range(self.coin.maxTakeoverDelay): - prefix = Prefixes.pending_activation.pack_partial_key(height+1+i, ACTIVATED_CLAIM_TXO_TYPE) + prefix = Prefixes.pending_activation.pack_partial_key(height+1+i) for _k, _v in self.db.iterator(prefix=prefix): k = Prefixes.pending_activation.unpack_key(_k) v = Prefixes.pending_activation.unpack_value(_v) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 0c15651bf..1ec118581 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1059,11 +1059,12 @@ class LBRYElectrumX(SessionBase): rows = [] extra = [] stream = await self.db.fs_getclaimbyid(claim_id) + if not stream: + stream = LookupError(f"Could not find claim at {claim_id}") rows.append(stream) # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) return Outputs.to_base64(rows, extra, 0, None, None) - def assert_tx_hash(self, value): '''Raise an RPCError if the value is not a valid transaction hash.''' diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index f688f8cf3..1fac0726f 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -455,6 +455,83 @@ class ResolveClaimTakeovers(BaseResolveTestCase): await self.generate(1) await self.assertMatchClaimIsWinning(name, third_claim_id) + async def test_early_takeover_zero_delay(self): + name = 'derp' + # block 207 + first_claim_id = (await self.stream_create(name, '0.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + + await self.generate(96) + # block 304, activates at 307 + second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, first_claim_id) + # on block 307 make a third claim with a yet higher amount, it takes over with no delay because the + # second claim activates and begins the takeover on this block + third_claim_id = (await self.stream_create(name, '0.3', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, third_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, third_claim_id) + + async def test_early_takeover_from_support_zero_delay(self): + name = 'derp' + # block 207 + first_claim_id = (await self.stream_create(name, '0.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + + await self.generate(96) + # block 304, activates at 307 + second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, first_claim_id) + third_claim_id = (await self.stream_create(name, '0.19', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + tx = await self.daemon.jsonrpc_support_create(third_claim_id, '0.1') + await self.ledger.wait(tx) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, third_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, third_claim_id) + + async def test_early_takeover_from_support_and_claim_zero_delay(self): + name = 'derp' + # block 207 + first_claim_id = (await self.stream_create(name, '0.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + + await self.generate(96) + # block 304, activates at 307 + second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(1) + + file_path = self.create_upload_file(data=b'hi!') + tx = await self.daemon.jsonrpc_stream_create(name, '0.19', file_path=file_path, allow_duplicate_name=True) + await self.ledger.wait(tx) + third_claim_id = tx.outputs[0].claim_id + + wallet = self.daemon.wallet_manager.get_wallet_or_default(None) + funding_accounts = wallet.get_accounts_or_all(None) + amount = self.daemon.get_dewies_or_error("amount", '0.1') + account = wallet.get_account_or_default(None) + claim_address = await account.receiving.get_or_create_usable_address() + tx = await Transaction.support( + 'derp', third_claim_id, amount, claim_address, funding_accounts, funding_accounts[0], None + ) + await tx.sign(funding_accounts) + await self.daemon.broadcast_or_release(tx, True) + await self.ledger.wait(tx) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, third_claim_id) + await self.generate(1) + await self.assertMatchClaimIsWinning(name, third_claim_id) + async def test_early_takeover_abandoned_controlling_support(self): name = 'derp' # block 207