From e678df86e05836cab07d97f110b872dfc2cdef70 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 20 May 2021 13:31:40 -0400 Subject: [PATCH] claim takeovers --- lbry/wallet/server/block_processor.py | 786 +++++++++--------- lbry/wallet/server/db/__init__.py | 4 +- lbry/wallet/server/db/claimtrie.py | 169 ++-- lbry/wallet/server/db/prefixes.py | 263 ++++-- lbry/wallet/server/leveldb.py | 191 ++--- .../blockchain/test_resolve_command.py | 218 +++-- 6 files changed, 855 insertions(+), 776 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index ed58903ed..fff008dba 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -4,23 +4,24 @@ import typing from bisect import bisect_right from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, Set, DefaultDict, Dict from prometheus_client import Gauge, Histogram from collections import defaultdict import lbry from lbry.schema.claim import Claim from lbry.wallet.transaction import OutputScript, Output -from lbry.wallet.server.tx import Tx +from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.crypto.hash import hash160 from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport, get_expiration_height -from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, get_force_activate_ops, get_delay_for_name -from lbry.wallet.server.db.prefixes import PendingClaimActivationPrefixRow, Prefixes -from lbry.wallet.server.db.revertable import RevertablePut +from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport +from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation +from lbry.wallet.server.db.claimtrie import get_remove_name_ops +from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE +from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue from lbry.wallet.server.udp import StatusServer if typing.TYPE_CHECKING: from lbry.wallet.server.leveldb import LevelDB @@ -204,13 +205,19 @@ class BlockProcessor: self.search_cache = {} self.history_cache = {} self.status_server = StatusServer() - self.effective_amount_changes = defaultdict(list) + self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} self.pending_claim_txos: typing.Dict[bytes, Tuple[int, int]] = {} - self.pending_supports = defaultdict(set) + self.pending_supports = defaultdict(list) + self.pending_support_txos = {} - self.pending_abandon = set() - self.staged_pending_abandoned = {} + + self.pending_removed_support = defaultdict(lambda: defaultdict(list)) + self.staged_pending_abandoned: Dict[bytes, StagedClaimtrieItem] = {} + self.removed_active_support = defaultdict(list) + self.staged_activated_support = defaultdict(list) + self.staged_activated_claim = {} + self.pending_activated = defaultdict(lambda: defaultdict(list)) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -241,6 +248,7 @@ class BlockProcessor: try: for block in blocks: await self.run_in_thread_with_lock(self.advance_block, block) + print("******************\n") except: self.logger.exception("advance blocks failed") raise @@ -363,7 +371,6 @@ class BlockProcessor: return start, count - # - Flushing def flush_data(self): """The data for a flush. The lock must be taken.""" @@ -386,461 +393,448 @@ class BlockProcessor: await self.flush(True) self.next_cache_check = time.perf_counter() + 30 - def check_cache_size(self): - """Flush a cache if it gets too big.""" - # Good average estimates based on traversal of subobjects and - # requesting size from Python (see deep_getsizeof). - one_MB = 1000*1000 - utxo_cache_size = len(self.utxo_cache) * 205 - db_deletes_size = len(self.db_deletes) * 57 - hist_cache_size = len(self.db.hist_unflushed) * 180 + self.db.hist_unflushed_count * 4 - # Roughly ntxs * 32 + nblocks * 42 - tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 - + (self.height - self.db.fs_height) * 42) - utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB - hist_MB = (hist_cache_size + tx_hash_size) // one_MB - - self.logger.info('our height: {:,d} daemon: {:,d} ' - 'UTXOs {:,d}MB hist {:,d}MB' - .format(self.height, self.daemon.cached_height(), - utxo_MB, hist_MB)) - - # Flush history if it takes up over 20% of cache memory. - # Flush UTXOs once they take up 80% of cache memory. - cache_MB = self.env.cache_MB - if utxo_MB + hist_MB >= cache_MB or hist_MB >= cache_MB // 5: - return utxo_MB >= cache_MB * 4 // 5 - return None - - def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout, - spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]], - zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']: + def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, + spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]) -> List['RevertableOp']: try: claim_name = txo.normalized_name except UnicodeDecodeError: claim_name = ''.join(chr(c) for c in txo.script.values['claim_name']) - if script.is_claim_name: - claim_hash = hash160(tx_hash + pack('>I', idx))[::-1] - # print(f"\tnew lbry://{claim_name}#{claim_hash.hex()} ({tx_count} {txout.value})") + if txo.script.is_claim_name: + claim_hash = hash160(tx_hash + pack('>I', nout))[::-1] + print(f"\tnew lbry://{claim_name}#{claim_hash.hex()} ({tx_num} {txo.amount})") else: claim_hash = txo.claim_hash[::-1] - - signing_channel_hash = None - channel_claims_count = 0 - activation_delay = self.db.get_activation_delay(claim_hash, claim_name) - if activation_delay == 0: - zero_delay_claims[(claim_name, claim_hash)] = tx_count, idx - # else: - # print("delay activation ", claim_name, activation_delay, height) - - activation_height = activation_delay + height + print(f"\tupdate lbry://{claim_name}#{claim_hash.hex()} ({tx_num} {txo.amount})") try: signable = txo.signable except: # google.protobuf.message.DecodeError: Could not parse JSON. signable = None + ops = [] + signing_channel_hash = None if signable and signable.signing_channel_hash: signing_channel_hash = txo.signable.signing_channel_hash[::-1] - # if signing_channel_hash in self.pending_claim_txos: - # pending_channel = self.pending_claims[self.pending_claim_txos[signing_channel_hash]] - # channel_claims_count = pending_channel. - - channel_claims_count = self.db.get_claims_in_channel_count(signing_channel_hash) + 1 - if script.is_claim_name: - support_amount = 0 - root_tx_num, root_idx = tx_count, idx + if txo.script.is_claim_name: + root_tx_num, root_idx = tx_num, nout else: if claim_hash not in spent_claims: print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}") return [] - support_amount = self.db.get_support_amount(claim_hash) (prev_tx_num, prev_idx, _) = spent_claims.pop(claim_hash) - # print(f"\tupdate lbry://{claim_name}#{claim_hash.hex()} {tx_hash[::-1].hex()} {txout.value}") - + print(f"\tupdate lbry://{claim_name}#{claim_hash.hex()} {tx_hash[::-1].hex()} {txo.amount}") if (prev_tx_num, prev_idx) in self.pending_claims: previous_claim = self.pending_claims.pop((prev_tx_num, prev_idx)) - root_tx_num = previous_claim.root_claim_tx_num - root_idx = previous_claim.root_claim_tx_position - # prev_amount = previous_claim.amount + root_tx_num, root_idx = previous_claim.root_claim_tx_num, previous_claim.root_claim_tx_position else: - k, v = self.db.get_root_claim_txo_and_current_amount( + k, v = self.db.get_claim_txo( claim_hash ) - root_tx_num = v.root_tx_num - root_idx = v.root_position - prev_amount = v.amount - + root_tx_num, root_idx = v.root_tx_num, v.root_position + activation = self.db.get_activation(prev_tx_num, prev_idx) + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, claim_name, v.amount + ).get_remove_activate_ops() + ) pending = StagedClaimtrieItem( - claim_name, claim_hash, txout.value, support_amount + txout.value, - activation_height, get_expiration_height(height), tx_count, idx, root_tx_num, root_idx, - signing_channel_hash, channel_claims_count + claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num, + root_idx, signing_channel_hash ) + self.pending_claims[(tx_num, nout)] = pending + self.pending_claim_txos[claim_hash] = (tx_num, nout) + ops.extend(pending.get_add_claim_utxo_ops()) + return ops - self.pending_claims[(tx_count, idx)] = pending - self.pending_claim_txos[claim_hash] = (tx_count, idx) - self.effective_amount_changes[claim_hash].append(txout.value) - return pending.get_add_claim_utxo_ops() - - def _add_support(self, height, txo, txout, idx, tx_count, - zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']: + def _add_support(self, txo: 'Output', tx_num: int, nout: int) -> List['RevertableOp']: supported_claim_hash = txo.claim_hash[::-1] + self.pending_supports[supported_claim_hash].append((tx_num, nout)) + self.pending_support_txos[(tx_num, nout)] = supported_claim_hash, txo.amount + print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}") + return StagedClaimtrieSupport( + supported_claim_hash, tx_num, nout, txo.amount + ).get_add_support_utxo_ops() - claim_info = self.db.get_root_claim_txo_and_current_amount( - supported_claim_hash - ) - controlling_claim = None - supported_tx_num = supported_position = supported_activation_height = supported_name = None - if claim_info: - k, v = claim_info - supported_name = v.name - supported_tx_num = k.tx_num - supported_position = k.position - supported_activation_height = v.activation - controlling_claim = self.db.get_controlling_claim(v.name) - - if supported_claim_hash in self.effective_amount_changes: - # print(f"\tsupport claim {supported_claim_hash.hex()} {txout.value}") - self.effective_amount_changes[supported_claim_hash].append(txout.value) - self.pending_supports[supported_claim_hash].add((tx_count, idx)) - self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value - return StagedClaimtrieSupport( - supported_claim_hash, tx_count, idx, txout.value - ).get_add_support_utxo_ops() - elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon: - # print(f"\tsupport claim {supported_claim_hash.hex()} {txout.value}") - ops = [] - if claim_info: - starting_amount = self.db.get_effective_amount(supported_claim_hash) - - if supported_claim_hash not in self.effective_amount_changes: - self.effective_amount_changes[supported_claim_hash].append(starting_amount) - self.effective_amount_changes[supported_claim_hash].append(txout.value) - supported_amount = self._get_pending_effective_amount(supported_claim_hash) - - if controlling_claim and supported_claim_hash != controlling_claim.claim_hash: - if supported_amount + txo.amount > self._get_pending_effective_amount(controlling_claim.claim_hash): - # takeover could happen - if (supported_name, supported_claim_hash) not in zero_delay_claims: - takeover_delay = get_delay_for_name(height - supported_activation_height) - if takeover_delay == 0: - zero_delay_claims[(supported_name, supported_claim_hash)] = ( - supported_tx_num, supported_position - ) - else: - ops.append( - RevertablePut( - *Prefixes.pending_activation.pack_item( - height + takeover_delay, supported_tx_num, supported_position, - supported_claim_hash, supported_name - ) - ) - ) - - self.pending_supports[supported_claim_hash].add((tx_count, idx)) - self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value - # print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}") - ops.extend(StagedClaimtrieSupport( - supported_claim_hash, tx_count, idx, txout.value - ).get_add_support_utxo_ops()) - return ops - else: - print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}") + def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output', + spent_claims: typing.Dict[bytes, Tuple[int, int, str]]) -> List['RevertableOp']: + if txo.script.is_claim_name or txo.script.is_update_claim: + return self._add_claim_or_update(height, txo, tx_hash, tx_num, nout, spent_claims) + elif txo.script.is_support_claim or txo.script.is_support_claim_data: + return self._add_support(txo, tx_num, nout) return [] - def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_count: int, idx: int, txo, txout, script, - spent_claims: typing.Dict[bytes, Tuple[int, int, str]], - zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']: - if script.is_claim_name or script.is_update_claim: - return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims, - zero_delay_claims) - elif script.is_support_claim or script.is_support_claim_data: - return self._add_support(height, txo, txout, idx, tx_count, zero_delay_claims) - return [] - - def _remove_support(self, txin, zero_delay_claims): + def _spend_support_txo(self, txin): txin_num = self.db.transaction_num_mapping[txin.prev_hash] - supported_name = None if (txin_num, txin.prev_idx) in self.pending_support_txos: spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx)) - supported_name = self._get_pending_claim_name(spent_support) self.pending_supports[spent_support].remove((txin_num, txin.prev_idx)) - else: - spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) - if spent_support: - supported_name = self._get_pending_claim_name(spent_support) - - if spent_support and support_amount is not None and spent_support not in self.pending_abandon: - controlling = self.db.get_controlling_claim(supported_name) - if controlling: - bid_queue = { - claim_hash: self._get_pending_effective_amount(claim_hash) - for claim_hash in self.db.get_claims_for_name(supported_name) - if claim_hash not in self.pending_abandon - } - bid_queue[spent_support] -= support_amount - sorted_claims = sorted( - list(bid_queue.keys()), key=lambda claim_hash: bid_queue[claim_hash], reverse=True - ) - if controlling.claim_hash == spent_support and sorted_claims.index(controlling.claim_hash) > 0: - print("takeover due to abandoned support") - - # print(f"\tspent support for {spent_support.hex()} -{support_amount} ({txin_num}, {txin.prev_idx}) {supported_name}") - if spent_support not in self.effective_amount_changes: - assert spent_support not in self.pending_claims - prev_effective_amount = self.db.get_effective_amount(spent_support) - self.effective_amount_changes[spent_support].append(prev_effective_amount) - self.effective_amount_changes[spent_support].append(-support_amount) + supported_name = self._get_pending_claim_name(spent_support) + print(f"\tspent support for lbry://{supported_name}#{spent_support.hex()}") + self.pending_removed_support[supported_name][spent_support].append((txin_num, txin.prev_idx)) return StagedClaimtrieSupport( spent_support, txin_num, txin.prev_idx, support_amount ).get_spend_support_txo_ops() + spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) + if spent_support: + supported_name = self._get_pending_claim_name(spent_support) + self.pending_removed_support[supported_name][spent_support].append((txin_num, txin.prev_idx)) + activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True) + self.removed_active_support[spent_support].append(support_amount) + print(f"\tspent support for lbry://{supported_name}#{spent_support.hex()} activation:{activation} {support_amount}") + return StagedClaimtrieSupport( + spent_support, txin_num, txin.prev_idx, support_amount + ).get_spend_support_txo_ops() + StagedActivation( + ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, + support_amount + ).get_remove_activate_ops() return [] - def _remove_claim(self, txin, spent_claims, zero_delay_claims): + def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]): txin_num = self.db.transaction_num_mapping[txin.prev_hash] if (txin_num, txin.prev_idx) in self.pending_claims: spent = self.pending_claims[(txin_num, txin.prev_idx)] - name = spent.name - spent_claims[spent.claim_hash] = (txin_num, txin.prev_idx, name) - # print(f"spend lbry://{name}#{spent.claim_hash.hex()}") else: - spent_claim_hash_and_name = self.db.claim_hash_and_name_from_txo( + spent_claim_hash_and_name = self.db.get_claim_from_txo( txin_num, txin.prev_idx ) if not spent_claim_hash_and_name: # txo is not a claim return [] - prev_claim_hash = spent_claim_hash_and_name.claim_hash - - prev_signing_hash = self.db.get_channel_for_claim(prev_claim_hash) - prev_claims_in_channel_count = None - if prev_signing_hash: - prev_claims_in_channel_count = self.db.get_claims_in_channel_count( - prev_signing_hash - ) - prev_effective_amount = self.db.get_effective_amount( - prev_claim_hash - ) - k, v = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash) - claim_root_tx_num = v.root_tx_num - claim_root_idx = v.root_position - prev_amount = v.amount - name = v.name - tx_num = k.tx_num - position = k.position - activation_height = v.activation - height = bisect_right(self.db.tx_counts, tx_num) + claim_hash = spent_claim_hash_and_name.claim_hash + signing_hash = self.db.get_channel_for_claim(claim_hash) + k, v = self.db.get_claim_txo(claim_hash) spent = StagedClaimtrieItem( - name, prev_claim_hash, prev_amount, prev_effective_amount, - activation_height, get_expiration_height(height), txin_num, txin.prev_idx, claim_root_tx_num, - claim_root_idx, prev_signing_hash, prev_claims_in_channel_count + v.name, claim_hash, v.amount, + self.coin.get_expiration_height(bisect_right(self.db.tx_counts, txin_num)), + txin_num, txin.prev_idx, v.root_tx_num, v.root_position, signing_hash ) - spent_claims[prev_claim_hash] = (txin_num, txin.prev_idx, name) - # print(f"spend lbry://{spent_claims[prev_claim_hash][2]}#{prev_claim_hash.hex()}") - if spent.claim_hash not in self.effective_amount_changes: - self.effective_amount_changes[spent.claim_hash].append(spent.effective_amount) - self.effective_amount_changes[spent.claim_hash].append(-spent.amount) - if (name, spent.claim_hash) in zero_delay_claims: - zero_delay_claims.pop((name, spent.claim_hash)) + spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name) + print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") return spent.get_spend_claim_txo_ops() - def _remove_claim_or_support(self, txin, spent_claims, zero_delay_claims): - spend_claim_ops = self._remove_claim(txin, spent_claims, zero_delay_claims) + def _spend_claim_or_support_txo(self, txin, spent_claims): + spend_claim_ops = self._spend_claim_txo(txin, spent_claims) if spend_claim_ops: return spend_claim_ops - return self._remove_support(txin, zero_delay_claims) + return self._spend_support_txo(txin) - def _abandon(self, spent_claims) -> typing.Tuple[List['RevertableOp'], typing.Set[str]]: + def _abandon_claim(self, claim_hash, tx_num, nout, name) -> List['RevertableOp']: + if (tx_num, nout) in self.pending_claims: + pending = self.pending_claims.pop((tx_num, nout)) + self.staged_pending_abandoned[pending.claim_hash] = pending + claim_root_tx_num, claim_root_idx = pending.root_claim_tx_num, pending.root_claim_tx_position + prev_amount, prev_signing_hash = pending.amount, pending.signing_hash + expiration = self.coin.get_expiration_height(self.height) + else: + k, v = self.db.get_claim_txo( + claim_hash + ) + claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount + prev_signing_hash = self.db.get_channel_for_claim(claim_hash) + expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num)) + self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem( + name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num, + claim_root_idx, prev_signing_hash + ) + + self.pending_supports[claim_hash].clear() + self.pending_supports.pop(claim_hash) + + return staged.get_abandon_ops(self.db.db) + + def _abandon(self, spent_claims) -> List['RevertableOp']: # Handle abandoned claims ops = [] - controlling_claims = {} - need_takeover = set() + for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items(): + print(f"\tabandon lbry://{name}#{abandoned_claim_hash.hex()} {tx_num} {nout}") + ops.extend(self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)) + return ops - for abandoned_claim_hash, (prev_tx_num, prev_idx, name) in spent_claims.items(): - # print(f"\tabandon lbry://{name}#{abandoned_claim_hash.hex()} {prev_tx_num} {prev_idx}") - - if (prev_tx_num, prev_idx) in self.pending_claims: - pending = self.pending_claims.pop((prev_tx_num, prev_idx)) - self.staged_pending_abandoned[pending.claim_hash] = pending - claim_root_tx_num = pending.root_claim_tx_num - claim_root_idx = pending.root_claim_tx_position - prev_amount = pending.amount - prev_signing_hash = pending.signing_hash - prev_effective_amount = pending.effective_amount - prev_claims_in_channel_count = pending.claims_in_channel_count - else: - k, v = self.db.get_root_claim_txo_and_current_amount( - abandoned_claim_hash - ) - claim_root_tx_num = v.root_tx_num - claim_root_idx = v.root_position - prev_amount = v.amount - prev_signing_hash = self.db.get_channel_for_claim(abandoned_claim_hash) - prev_claims_in_channel_count = None - if prev_signing_hash: - prev_claims_in_channel_count = self.db.get_claims_in_channel_count( - prev_signing_hash - ) - prev_effective_amount = self.db.get_effective_amount( - abandoned_claim_hash - ) - - if name not in controlling_claims: - controlling_claims[name] = self.db.get_controlling_claim(name) - controlling = controlling_claims[name] - if controlling and controlling.claim_hash == abandoned_claim_hash: - need_takeover.add(name) - # print("needs takeover") - - for (support_tx_num, support_tx_idx) in self.pending_supports[abandoned_claim_hash]: - _, support_amount = self.pending_support_txos.pop((support_tx_num, support_tx_idx)) - ops.extend( - StagedClaimtrieSupport( - abandoned_claim_hash, support_tx_num, support_tx_idx, support_amount - ).get_spend_support_txo_ops() - ) - # print(f"\tremove pending support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}") - self.pending_supports[abandoned_claim_hash].clear() - self.pending_supports.pop(abandoned_claim_hash) - - for (support_tx_num, support_tx_idx, support_amount) in self.db.get_supports(abandoned_claim_hash): - ops.extend( - StagedClaimtrieSupport( - abandoned_claim_hash, support_tx_num, support_tx_idx, support_amount - ).get_spend_support_txo_ops() - ) - # print(f"\tremove support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}") - - height = bisect_right(self.db.tx_counts, prev_tx_num) - activation_height = 0 - - if abandoned_claim_hash in self.effective_amount_changes: - # print("pop") - self.effective_amount_changes.pop(abandoned_claim_hash) - self.pending_abandon.add(abandoned_claim_hash) - - # print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}, {len(need_takeover)} names need takeovers") - ops.extend( - StagedClaimtrieItem( - name, abandoned_claim_hash, prev_amount, prev_effective_amount, - activation_height, get_expiration_height(height), prev_tx_num, prev_idx, claim_root_tx_num, - claim_root_idx, prev_signing_hash, prev_claims_in_channel_count - ).get_abandon_ops(self.db.db) - ) - return ops, need_takeover - - def _expire_claims(self, height: int, zero_delay_claims): + def _expire_claims(self, height: int): expired = self.db.get_expired_by_height(height) spent_claims = {} ops = [] - names_needing_takeover = set() for expired_claim_hash, (tx_num, position, name, txi) in expired.items(): if (tx_num, position) not in self.pending_claims: - ops.extend(self._remove_claim(txi, spent_claims, zero_delay_claims)) + ops.extend(self._spend_claim_txo(txi, spent_claims)) if expired: # do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned - abandon_ops, _names_needing_takeover = self._abandon(spent_claims) - if abandon_ops: - ops.extend(abandon_ops) - names_needing_takeover.update(_names_needing_takeover) ops.extend(self._abandon(spent_claims)) - return ops, names_needing_takeover + return ops - def _get_pending_claim_amount(self, claim_hash: bytes) -> int: - if claim_hash in self.pending_claim_txos: - return self.pending_claims[self.pending_claim_txos[claim_hash]].amount - return self.db.get_claim_amount(claim_hash) + def _get_pending_claim_amount(self, name: str, claim_hash: bytes) -> int: + if (name, claim_hash) in self.staged_activated_claim: + return self.staged_activated_claim[(name, claim_hash)] + return self.db._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.height + 1) - def _get_pending_claim_name(self, claim_hash: bytes) -> str: + def _get_pending_claim_name(self, claim_hash: bytes) -> Optional[str]: assert claim_hash is not None if claim_hash in self.pending_claims: return self.pending_claims[claim_hash].name - claim = self.db.get_claim_from_txo(claim_hash) - return claim.name + claim_info = self.db.get_claim_txo(claim_hash) + if claim_info: + return claim_info[1].name - def _get_pending_effective_amount(self, claim_hash: bytes) -> int: - claim_amount = self._get_pending_claim_amount(claim_hash) or 0 - support_amount = self.db.get_support_amount(claim_hash) or 0 - return claim_amount + support_amount + sum( - self.pending_support_txos[support_txnum, support_n][1] - for (support_txnum, support_n) in self.pending_supports.get(claim_hash, []) - ) # TODO: subtract pending spend supports + def _get_pending_supported_amount(self, claim_hash: bytes) -> int: + support_amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.height + 1) or 0 + amount = support_amount + sum( + self.staged_activated_support.get(claim_hash, []) + ) + if claim_hash in self.removed_active_support: + return amount - sum(self.removed_active_support[claim_hash]) + return amount - def _get_name_takeover_ops(self, height: int, name: str, - activated_claims: typing.Set[bytes]) -> List['RevertableOp']: - controlling = self.db.get_controlling_claim(name) - if not controlling or controlling.claim_hash in self.pending_abandon: - # print("no controlling claim for ", name) - bid_queue = { - claim_hash: self._get_pending_effective_amount(claim_hash) for claim_hash in activated_claims - } - winning_claim = max(bid_queue, key=lambda k: bid_queue[k]) - if winning_claim in self.pending_claim_txos: - s = self.pending_claims[self.pending_claim_txos[winning_claim]] - else: - s = self.db.make_staged_claim_item(winning_claim) - ops = [] - if s.activation_height > height: - ops.extend(get_force_activate_ops( - name, s.tx_num, s.position, s.claim_hash, s.root_claim_tx_num, s.root_claim_tx_position, - s.amount, s.effective_amount, s.activation_height, height - )) - ops.extend(get_takeover_name_ops(name, winning_claim, height)) - return ops - else: - # print(f"current controlling claim for {name}#{controlling.claim_hash.hex()}") - controlling_effective_amount = self._get_pending_effective_amount(controlling.claim_hash) - bid_queue = { - claim_hash: self._get_pending_effective_amount(claim_hash) for claim_hash in activated_claims - } - highest_newly_activated = max(bid_queue, key=lambda k: bid_queue[k]) - if bid_queue[highest_newly_activated] > controlling_effective_amount: - # print(f"takeover controlling claim for {name}#{controlling.claim_hash.hex()}") - return get_takeover_name_ops(name, highest_newly_activated, height, controlling) - print(bid_queue[highest_newly_activated], controlling_effective_amount) - # print("no takeover") - return [] + def _get_pending_effective_amount(self, name: str, claim_hash: bytes) -> int: + claim_amount = self._get_pending_claim_amount(name, claim_hash) + support_amount = self._get_pending_supported_amount(claim_hash) + return claim_amount + support_amount - def _get_takeover_ops(self, height: int, zero_delay_claims) -> List['RevertableOp']: + def _get_takeover_ops(self, height: int) -> List['RevertableOp']: ops = [] - pending = defaultdict(set) - - # get non delayed takeovers for new names - for (name, claim_hash) in zero_delay_claims: - if claim_hash not in self.pending_abandon: - pending[name].add(claim_hash) - # print("zero delay activate", name, claim_hash.hex()) # get takeovers from claims activated at this block - for activated in self.db.get_activated_claims_at_height(height): - if activated.claim_hash not in self.pending_abandon: - pending[activated.name].add(activated.claim_hash) - # print("delayed activate") + activated_at_height = self.db.get_activated_at_height(height) + controlling_claims = {} + abandoned_need_takeover = [] + abandoned_support_check_need_takeover = defaultdict(list) - # get takeovers from supports for controlling claims being abandoned - for abandoned_claim_hash in self.pending_abandon: - if abandoned_claim_hash in self.staged_pending_abandoned: - abandoned = self.staged_pending_abandoned[abandoned_claim_hash] - controlling = self.db.get_controlling_claim(abandoned.name) - if controlling and controlling.claim_hash == abandoned_claim_hash and abandoned.name not in pending: - pending[abandoned.name].update(self.db.get_claims_for_name(abandoned.name)) + def get_controlling(_name): + if _name not in controlling_claims: + _controlling = self.db.get_controlling_claim(_name) + controlling_claims[_name] = _controlling else: - k, v = self.db.get_root_claim_txo_and_current_amount(abandoned_claim_hash) - controlling_claim = self.db.get_controlling_claim(v.name) - if controlling_claim and abandoned_claim_hash == controlling_claim.claim_hash and v.name not in pending: - pending[v.name].update(self.db.get_claims_for_name(v.name)) - # print("check abandoned winning") + _controlling = controlling_claims[_name] + return _controlling + # determine names needing takeover/deletion due to controlling claims being abandoned + # and add ops to deactivate abandoned claims + for claim_hash, staged in self.staged_pending_abandoned.items(): + controlling = get_controlling(staged.name) + if controlling and controlling.claim_hash == claim_hash: + abandoned_need_takeover.append(staged.name) + print(f"\t{staged.name} needs takeover") + activation = self.db.get_activation(staged.tx_num, staged.position) + if activation > 0: + # removed queued future activation from the db + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, + activation, staged.name, staged.amount + ).get_remove_activate_ops() + ) + else: + # it hadn't yet been activated, db returns -1 for non-existent txos + pass + # build set of controlling claims that had activated supports spent to check them for takeovers later + for claim_hash, amounts in self.removed_active_support.items(): + name = self._get_pending_claim_name(claim_hash) + controlling = get_controlling(name) + if controlling and controlling.claim_hash == claim_hash and name not in abandoned_need_takeover: + abandoned_support_check_need_takeover[(name, claim_hash)].extend(amounts) - # get takeovers from controlling claims being abandoned + # prepare to activate or delay activation of the pending claims being added this block + for (tx_num, nout), staged in self.pending_claims.items(): + controlling = get_controlling(staged.name) + delay = 0 + if not controlling or staged.claim_hash == controlling.claim_hash or \ + controlling.claim_hash in abandoned_need_takeover: + pass + else: + controlling_effective_amount = self._get_pending_effective_amount(staged.name, controlling.claim_hash) + amount = self._get_pending_effective_amount(staged.name, staged.claim_hash) + delay = 0 + # if this is an OP_CLAIM or the amount appears to trigger a takeover, delay + if not staged.is_update or (amount > controlling_effective_amount): + delay = self.coin.get_delay_for_name(height - controlling.height) + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, + height + delay, staged.name, staged.amount + ).get_activate_ops() + ) + if delay == 0: # if delay was 0 it needs to be considered for takeovers + activated_at_height[PendingActivationValue(staged.claim_hash, staged.name)].append( + PendingActivationKey(height, ACTIVATED_CLAIM_TXO_TYPE, tx_num, nout) + ) + + # and the supports + for (tx_num, nout), (claim_hash, amount) in self.pending_support_txos.items(): + if claim_hash in self.staged_pending_abandoned: + continue + elif claim_hash in self.pending_claim_txos: + name = self.pending_claims[self.pending_claim_txos[claim_hash]].name + is_update = self.pending_claims[self.pending_claim_txos[claim_hash]].is_update + else: + k, v = self.db.get_claim_txo(claim_hash) + name = v.name + is_update = (v.root_tx_num, v.root_position) != (k.tx_num, k.position) + + controlling = get_controlling(name) + delay = 0 + if not controlling or claim_hash == controlling.claim_hash: + pass + elif not is_update or self._get_pending_effective_amount(staged.name, + claim_hash) > self._get_pending_effective_amount(staged.name, controlling.claim_hash): + delay = self.coin.get_delay_for_name(height - controlling.height) + if delay == 0: + activated_at_height[PendingActivationValue(claim_hash, name)].append( + PendingActivationKey(height + delay, ACTIVATED_SUPPORT_TXO_TYPE, tx_num, nout) + ) + ops.extend( + StagedActivation( + ACTIVATED_SUPPORT_TXO_TYPE, claim_hash, tx_num, nout, + height + delay, name, amount + ).get_activate_ops() + ) + + # add the activation/delayed-activation ops + for activated, activated_txos in activated_at_height.items(): + controlling = get_controlling(activated.name) + + if activated.claim_hash in self.staged_pending_abandoned: + continue + reactivate = False + if not controlling or controlling.claim_hash == activated.claim_hash: + # there is no delay for claims to a name without a controlling value or to the controlling value + reactivate = True + for activated_txo in activated_txos: + if activated_txo.is_support and (activated_txo.tx_num, activated_txo.position) in \ + self.pending_removed_support[activated.name][activated.claim_hash]: + print("\tskip activate support for pending abandoned claim") + continue + if activated_txo.is_claim: + txo_type = ACTIVATED_CLAIM_TXO_TYPE + txo_tup = (activated_txo.tx_num, activated_txo.position) + if txo_tup in self.pending_claims: + amount = self.pending_claims[txo_tup].amount + else: + amount = self.db.get_claim_txo_amount( + activated.claim_hash, activated_txo.tx_num, activated_txo.position + ) + self.staged_activated_claim[(activated.name, activated.claim_hash)] = amount + else: + txo_type = ACTIVATED_SUPPORT_TXO_TYPE + txo_tup = (activated_txo.tx_num, activated_txo.position) + if txo_tup in self.pending_support_txos: + amount = self.pending_support_txos[txo_tup][1] + else: + amount = self.db.get_support_txo_amount( + activated.claim_hash, activated_txo.tx_num, activated_txo.position + ) + self.staged_activated_support[activated.claim_hash].append(amount) + self.pending_activated[activated.name][activated.claim_hash].append((activated_txo, amount)) + print(f"\tactivate {'support' if txo_type == ACTIVATED_SUPPORT_TXO_TYPE else 'claim'} " + f"lbry://{activated.name}#{activated.claim_hash.hex()} @ {activated_txo.height}") + if reactivate: + ops.extend( + StagedActivation( + txo_type, activated.claim_hash, activated_txo.tx_num, activated_txo.position, + activated_txo.height, activated.name, amount + ).get_activate_ops() + ) + + # go through claims where the controlling claim or supports to the controlling claim have been abandoned + # check if takeovers are needed or if the name node is now empty + need_reactivate_if_takes_over = {} + for need_takeover in abandoned_need_takeover: + existing = self.db.get_claim_txos_for_name(need_takeover) + has_candidate = False + # add existing claims to the queue for the takeover + # track that we need to reactivate these if one of them becomes controlling + for candidate_claim_hash, (tx_num, nout) in existing.items(): + if candidate_claim_hash in self.staged_pending_abandoned: + continue + has_candidate = True + existing_activation = self.db.get_activation(tx_num, nout) + activate_key = PendingActivationKey( + existing_activation, ACTIVATED_CLAIM_TXO_TYPE, tx_num, nout + ) + self.pending_activated[need_takeover][candidate_claim_hash].append(( + activate_key, self.db.get_claim_txo_amount(candidate_claim_hash, tx_num, nout) + )) + need_reactivate_if_takes_over[(need_takeover, candidate_claim_hash)] = activate_key + print(f"\tcandidate to takeover abandoned controlling claim for lbry://{need_takeover} - " + f"{activate_key.tx_num}:{activate_key.position} {activate_key.is_claim}") + if not has_candidate: + # remove name takeover entry, the name is now unclaimed + controlling = get_controlling(need_takeover) + ops.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) + + # process takeovers from the combined newly added and previously scheduled claims + checked_names = set() + for name, activated in self.pending_activated.items(): + checked_names.add(name) + if name in abandoned_need_takeover: + print(f'\tabandoned {name} need takeover') + controlling = controlling_claims[name] + amounts = { + claim_hash: self._get_pending_effective_amount(name, claim_hash) + for claim_hash in activated.keys() if claim_hash not in self.staged_pending_abandoned + } + if controlling and controlling.claim_hash not in self.staged_pending_abandoned: + amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash) + winning = max(amounts, key=lambda x: amounts[x]) + if not controlling or (winning != controlling.claim_hash and name in abandoned_need_takeover) or ((winning != controlling.claim_hash) and + (amounts[winning] > amounts[controlling.claim_hash])): + if (name, winning) in need_reactivate_if_takes_over: + previous_pending_activate = need_reactivate_if_takes_over[(name, winning)] + amount = self.db.get_claim_txo_amount( + winning, previous_pending_activate.tx_num, previous_pending_activate.position + ) + if winning in self.pending_claim_txos: + tx_num, position = self.pending_claim_txos[winning] + amount = self.pending_claims[(tx_num, position)].amount + else: + tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position + if previous_pending_activate.height > height: + # the claim had a pending activation in the future, move it to now + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, winning, tx_num, + position, previous_pending_activate.height, name, amount + ).get_remove_activate_ops() + ) + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, winning, tx_num, + position, height, name, amount + ).get_activate_ops() + ) + ops.extend(get_takeover_name_ops(name, winning, height)) + else: + ops.extend(get_takeover_name_ops(name, winning, height)) + + elif winning == controlling.claim_hash: + print("\tstill winning") + pass + else: + print("\tno takeover") + pass + + # handle remaining takeovers from abandoned supports + for (name, claim_hash), amounts in abandoned_support_check_need_takeover.items(): + if name in checked_names: + continue + checked_names.add(name) + + controlling = get_controlling(name) + + amounts = { + claim_hash: self._get_pending_effective_amount(name, claim_hash) + for claim_hash in self.db.get_claims_for_name(name) if claim_hash not in self.staged_pending_abandoned + } + if controlling and controlling.claim_hash not in self.staged_pending_abandoned: + amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash) + winning = max(amounts, key=lambda x: amounts[x]) + if (controlling and winning != controlling.claim_hash) or (not controlling and winning): + print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") + ops.extend(get_takeover_name_ops(name, winning, height)) - for name, activated_claims in pending.items(): - ops.extend(self._get_name_takeover_ops(height, name, activated_claims)) return ops def advance_block(self, block): - # print("advance ", height) height = self.height + 1 + print("advance ", height) txs: List[Tuple[Tx, bytes]] = block.transactions block_hash = self.coin.header_hash(block.header) @@ -881,34 +875,32 @@ class BlockProcessor: undo_info_append(cache_value) append_hashX(cache_value[:-12]) - spend_claim_or_support_ops = self._remove_claim_or_support(txin, spent_claims, zero_delay_claims) + spend_claim_or_support_ops = self._spend_claim_or_support_txo(txin, spent_claims) if spend_claim_or_support_ops: claimtrie_stash_extend(spend_claim_or_support_ops) # Add the new UTXOs - for idx, txout in enumerate(tx.outputs): + for nout, txout in enumerate(tx.outputs): # Get the hashX. Ignore unspendable outputs hashX = hashX_from_script(txout.pk_script) if hashX: append_hashX(hashX) - put_utxo(tx_hash + pack('= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}' - claimtrie_stash.extend( - self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount) - ) - undo_claims = b''.join(op.invert().pack() for op in claimtrie_stash) self.claimtrie_stash.extend(claimtrie_stash) # print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash))) @@ -961,14 +945,18 @@ class BlockProcessor: self.db.flush_dbs(self.flush_data()) - self.effective_amount_changes.clear() + # self.effective_amount_changes.clear() self.pending_claims.clear() self.pending_claim_txos.clear() self.pending_supports.clear() self.pending_support_txos.clear() - self.pending_abandon.clear() + self.pending_removed_support.clear() self.staged_pending_abandoned.clear() + self.removed_active_support.clear() + self.staged_activated_support.clear() + self.staged_activated_claim.clear() + self.pending_activated.clear() for cache in self.search_cache.values(): cache.clear() diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index 52b9f4e33..2a47fc5c5 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -12,11 +12,13 @@ class DB_PREFIXES(enum.Enum): channel_to_claim = b'J' claim_short_id_prefix = b'F' - claim_effective_amount_prefix = b'D' + # claim_effective_amount_prefix = b'D' claim_expiration = b'O' claim_takeover = b'P' pending_activation = b'Q' + activated_claim_and_support = b'R' + active_amount = b'S' undo_claimtrie = b'M' diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 2634ed595..70f377d9f 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -45,40 +45,52 @@ class StagedClaimtrieSupport(typing.NamedTuple): return self._get_add_remove_support_utxo_ops(add=False) -def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_effective_amount: int, tx_num: int, - position: int, root_tx_num: int, root_position: int, claim_hash: bytes, - activation_height: int, prev_activation_height: int, - signing_hash: Optional[bytes] = None, - claims_in_channel_count: Optional[int] = None): - assert root_position != root_tx_num, f"{tx_num} {position} {root_tx_num} {root_tx_num}" - ops = [ - RevertableDelete( - *Prefixes.claim_effective_amount.pack_item( - name, prev_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position, - prev_activation_height +class StagedActivation(typing.NamedTuple): + txo_type: int + claim_hash: bytes + tx_num: int + position: int + activation_height: int + name: str + amount: int + + def _get_add_remove_activate_ops(self, add=True): + op = RevertablePut if add else RevertableDelete + print(f"\t{'add' if add else 'remove'} {self.txo_type}, {self.tx_num}, {self.position}, activation={self.activation_height}, {self.name}") + return [ + op( + *Prefixes.activated.pack_item( + self.txo_type, self.tx_num, self.position, self.activation_height, self.claim_hash, self.name + ) + ), + op( + *Prefixes.pending_activation.pack_item( + self.activation_height, self.txo_type, self.tx_num, self.position, + self.claim_hash, self.name + ) + ), + op( + *Prefixes.active_amount.pack_item( + self.claim_hash, self.txo_type, self.activation_height, self.tx_num, self.position, self.amount + ) ) - ), - RevertablePut( - *Prefixes.claim_effective_amount.pack_item( - name, new_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position, - activation_height + ] + + def get_activate_ops(self) -> typing.List[RevertableOp]: + return self._get_add_remove_activate_ops(add=True) + + def get_remove_activate_ops(self) -> typing.List[RevertableOp]: + return self._get_add_remove_activate_ops(add=False) + + +def get_remove_name_ops(name: str, claim_hash: bytes, height: int) -> typing.List[RevertableDelete]: + return [ + RevertableDelete( + *Prefixes.claim_takeover.pack_item( + name, claim_hash, height ) ) ] - if signing_hash: - ops.extend([ - RevertableDelete( - *Prefixes.channel_to_claim.pack_item( - signing_hash, name, prev_effective_amount, tx_num, position, claim_hash, claims_in_channel_count - ) - ), - RevertablePut( - *Prefixes.channel_to_claim.pack_item( - signing_hash, name, new_effective_amount, tx_num, position, claim_hash, claims_in_channel_count - ) - ) - ]) - return ops def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int, @@ -107,76 +119,16 @@ def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int, ] -def get_force_activate_ops(name: str, tx_num: int, position: int, claim_hash: bytes, root_claim_tx_num: int, - root_claim_tx_position: int, amount: int, effective_amount: int, - prev_activation_height: int, new_activation_height: int): - return [ - # delete previous - RevertableDelete( - *Prefixes.claim_effective_amount.pack_item( - name, effective_amount, tx_num, position, claim_hash, - root_claim_tx_num, root_claim_tx_position, prev_activation_height - ) - ), - RevertableDelete( - *Prefixes.claim_to_txo.pack_item( - claim_hash, tx_num, position, root_claim_tx_num, root_claim_tx_position, - amount, prev_activation_height, name - ) - ), - RevertableDelete( - *Prefixes.claim_short_id.pack_item( - name, claim_hash, root_claim_tx_num, root_claim_tx_position, tx_num, - position, prev_activation_height - ) - ), - RevertableDelete( - *Prefixes.pending_activation.pack_item( - prev_activation_height, tx_num, position, claim_hash, name - ) - ), - - # insert new - RevertablePut( - *Prefixes.claim_effective_amount.pack_item( - name, effective_amount, tx_num, position, claim_hash, - root_claim_tx_num, root_claim_tx_position, new_activation_height - ) - ), - RevertablePut( - *Prefixes.claim_to_txo.pack_item( - claim_hash, tx_num, position, root_claim_tx_num, root_claim_tx_position, - amount, new_activation_height, name - ) - ), - RevertablePut( - *Prefixes.claim_short_id.pack_item( - name, claim_hash, root_claim_tx_num, root_claim_tx_position, tx_num, - position, new_activation_height - ) - ), - RevertablePut( - *Prefixes.pending_activation.pack_item( - new_activation_height, tx_num, position, claim_hash, name - ) - ) - - ] - - class StagedClaimtrieItem(typing.NamedTuple): name: str claim_hash: bytes amount: int - effective_amount: int - activation_height: int expiration_height: int tx_num: int position: int root_claim_tx_num: int root_claim_tx_position: int signing_hash: Optional[bytes] - claims_in_channel_count: Optional[int] @property def is_update(self) -> bool: @@ -191,25 +143,11 @@ class StagedClaimtrieItem(typing.NamedTuple): """ op = RevertablePut if add else RevertableDelete ops = [ - # url resolution by effective amount - op( - *Prefixes.claim_effective_amount.pack_item( - self.name, self.effective_amount, self.tx_num, self.position, self.claim_hash, - self.root_claim_tx_num, self.root_claim_tx_position, self.activation_height - ) - ), # claim tip by claim hash op( *Prefixes.claim_to_txo.pack_item( self.claim_hash, self.tx_num, self.position, self.root_claim_tx_num, self.root_claim_tx_position, - self.amount, self.activation_height, self.name - ) - ), - # short url resolution - op( - *Prefixes.claim_short_id.pack_item( - self.name, self.claim_hash, self.root_claim_tx_num, self.root_claim_tx_position, self.tx_num, - self.position, self.activation_height + self.amount, self.name ) ), # claim hash by txo @@ -223,15 +161,16 @@ class StagedClaimtrieItem(typing.NamedTuple): self.name ) ), - # claim activation + # short url resolution op( - *Prefixes.pending_activation.pack_item( - self.activation_height, self.tx_num, self.position, self.claim_hash, self.name + *Prefixes.claim_short_id.pack_item( + self.name, self.claim_hash, self.root_claim_tx_num, self.root_claim_tx_position, self.tx_num, + self.position ) ) ] - if self.signing_hash and self.claims_in_channel_count is not None: - # claims_in_channel_count can be none if the channel doesnt exist + + if self.signing_hash: ops.extend([ # channel by stream op( @@ -240,8 +179,7 @@ class StagedClaimtrieItem(typing.NamedTuple): # stream by channel op( *Prefixes.channel_to_claim.pack_item( - self.signing_hash, self.name, self.effective_amount, self.tx_num, self.position, - self.claim_hash, self.claims_in_channel_count + self.signing_hash, self.name, self.tx_num, self.position, self.claim_hash ) ) ]) @@ -257,8 +195,8 @@ class StagedClaimtrieItem(typing.NamedTuple): if not self.signing_hash: return [] return [ - RevertableDelete(*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash)) - ] + delete_prefix(db, DB_PREFIXES.channel_to_claim.value + self.signing_hash) + RevertableDelete(*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash)) + ] + delete_prefix(db, DB_PREFIXES.channel_to_claim.value + self.signing_hash) def get_abandon_ops(self, db) -> typing.List[RevertableOp]: packed_name = length_encoded_name(self.name) @@ -267,5 +205,4 @@ class StagedClaimtrieItem(typing.NamedTuple): ) delete_claim_ops = delete_prefix(db, DB_PREFIXES.claim_to_txo.value + self.claim_hash) delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash) - invalidate_channel_ops = self.get_invalidate_channel_ops(db) - return delete_short_id_ops + delete_claim_ops + delete_supports_ops + invalidate_channel_ops + return delete_short_id_ops + delete_claim_ops + delete_supports_ops + self.get_invalidate_channel_ops(db) diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index c34bbb0e5..c33b1371a 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -3,6 +3,10 @@ import struct from lbry.wallet.server.db import DB_PREFIXES +ACTIVATED_CLAIM_TXO_TYPE = 1 +ACTIVATED_SUPPORT_TXO_TYPE = 2 + + def length_encoded_name(name: str) -> bytes: encoded = name.encode('utf-8') return len(encoded).to_bytes(2, byteorder='big') + encoded @@ -12,6 +16,11 @@ class PrefixRow: prefix: bytes key_struct: struct.Struct value_struct: struct.Struct + key_part_lambdas = [] + + @classmethod + def pack_partial_key(cls, *args) -> bytes: + return cls.prefix + cls.key_part_lambdas[len(args)](*args) @classmethod def pack_key(cls, *args) -> bytes: @@ -35,20 +44,6 @@ class PrefixRow: return cls.unpack_key(key), cls.unpack_value(value) -class EffectiveAmountKey(typing.NamedTuple): - name: str - effective_amount: int - tx_num: int - position: int - - -class EffectiveAmountValue(typing.NamedTuple): - claim_hash: bytes - root_tx_num: int - root_position: int - activation: int - - class ClaimToTXOKey(typing.NamedTuple): claim_hash: bytes tx_num: int @@ -59,7 +54,7 @@ class ClaimToTXOValue(typing.NamedTuple): root_tx_num: int root_position: int amount: int - activation: int + # activation: int name: str @@ -83,7 +78,6 @@ class ClaimShortIDKey(typing.NamedTuple): class ClaimShortIDValue(typing.NamedTuple): tx_num: int position: int - activation: int class ClaimToChannelKey(typing.NamedTuple): @@ -97,14 +91,12 @@ class ClaimToChannelValue(typing.NamedTuple): class ChannelToClaimKey(typing.NamedTuple): signing_hash: bytes name: str - effective_amount: int tx_num: int position: int class ChannelToClaimValue(typing.NamedTuple): claim_hash: bytes - claims_in_channel: int class ClaimToSupportKey(typing.NamedTuple): @@ -148,55 +140,92 @@ class ClaimTakeoverValue(typing.NamedTuple): class PendingActivationKey(typing.NamedTuple): height: int + txo_type: int tx_num: int position: int + @property + def is_support(self) -> bool: + return self.txo_type == ACTIVATED_SUPPORT_TXO_TYPE + + @property + def is_claim(self) -> bool: + return self.txo_type == ACTIVATED_CLAIM_TXO_TYPE + class PendingActivationValue(typing.NamedTuple): claim_hash: bytes name: str -class EffectiveAmountPrefixRow(PrefixRow): - prefix = DB_PREFIXES.claim_effective_amount_prefix.value - key_struct = struct.Struct(b'>QLH') - value_struct = struct.Struct(b'>20sLHL') +class ActivationKey(typing.NamedTuple): + txo_type: int + tx_num: int + position: int + + +class ActivationValue(typing.NamedTuple): + height: int + claim_hash: bytes + name: str + + +class ActiveAmountKey(typing.NamedTuple): + claim_hash: bytes + txo_type: int + activation_height: int + tx_num: int + position: int + + +class ActiveAmountValue(typing.NamedTuple): + amount: int + + +class ActiveAmountPrefixRow(PrefixRow): + prefix = DB_PREFIXES.active_amount.value + key_struct = struct.Struct(b'>20sBLLH') + value_struct = struct.Struct(b'>Q') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack, + struct.Struct(b'>20sB').pack, + struct.Struct(b'>20sBL').pack, + struct.Struct(b'>20sBLL').pack, + struct.Struct(b'>20sBLLH').pack + ] @classmethod - def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int): - return cls.prefix + length_encoded_name(name) + cls.key_struct.pack( - 0xffffffffffffffff - effective_amount, tx_num, position - ) + def pack_key(cls, claim_hash: bytes, txo_type: int, activation_height: int, tx_num: int, position: int): + return super().pack_key(claim_hash, txo_type, activation_height, tx_num, position) @classmethod - def unpack_key(cls, key: bytes) -> EffectiveAmountKey: - assert key[:1] == cls.prefix - name_len = int.from_bytes(key[1:3], byteorder='big') - name = key[3:3 + name_len].decode() - ones_comp_effective_amount, tx_num, position = cls.key_struct.unpack(key[3 + name_len:]) - return EffectiveAmountKey( - name, 0xffffffffffffffff - ones_comp_effective_amount, tx_num, position - ) + def unpack_key(cls, key: bytes) -> ActiveAmountKey: + return ActiveAmountKey(*super().unpack_key(key)) @classmethod - def unpack_value(cls, data: bytes) -> EffectiveAmountValue: - return EffectiveAmountValue(*super().unpack_value(data)) + def unpack_value(cls, data: bytes) -> ActiveAmountValue: + return ActiveAmountValue(*super().unpack_value(data)) @classmethod - def pack_value(cls, claim_hash: bytes, root_tx_num: int, root_position: int, activation: int) -> bytes: - return super().pack_value(claim_hash, root_tx_num, root_position, activation) + def pack_value(cls, amount: int) -> bytes: + return cls.value_struct.pack(amount) @classmethod - def pack_item(cls, name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes, - root_tx_num: int, root_position: int, activation: int): - return cls.pack_key(name, effective_amount, tx_num, position), \ - cls.pack_value(claim_hash, root_tx_num, root_position, activation) + def pack_item(cls, claim_hash: bytes, txo_type: int, activation_height: int, tx_num: int, position: int, amount: int): + return cls.pack_key(claim_hash, txo_type, activation_height, tx_num, position), cls.pack_value(amount) class ClaimToTXOPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_to_txo.value key_struct = struct.Struct(b'>20sLH') - value_struct = struct.Struct(b'>LHQL') + value_struct = struct.Struct(b'>LHQ') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack, + struct.Struct(b'>20sL').pack, + struct.Struct(b'>20sLH').pack + ] @classmethod def pack_key(cls, claim_hash: bytes, tx_num: int, position: int): @@ -214,20 +243,20 @@ class ClaimToTXOPrefixRow(PrefixRow): @classmethod def unpack_value(cls, data: bytes) -> ClaimToTXOValue: - root_tx_num, root_position, amount, activation = cls.value_struct.unpack(data[:18]) - name_len = int.from_bytes(data[18:20], byteorder='big') - name = data[20:20 + name_len].decode() - return ClaimToTXOValue(root_tx_num, root_position, amount, activation, name) + root_tx_num, root_position, amount = cls.value_struct.unpack(data[:14]) + name_len = int.from_bytes(data[14:16], byteorder='big') + name = data[16:16 + name_len].decode() + return ClaimToTXOValue(root_tx_num, root_position, amount, name) @classmethod - def pack_value(cls, root_tx_num: int, root_position: int, amount: int, activation: int, name: str) -> bytes: - return cls.value_struct.pack(root_tx_num, root_position, amount, activation) + length_encoded_name(name) + def pack_value(cls, root_tx_num: int, root_position: int, amount: int, name: str) -> bytes: + return cls.value_struct.pack(root_tx_num, root_position, amount) + length_encoded_name(name) @classmethod def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int, - amount: int, activation: int, name: str): + amount: int, name: str): return cls.pack_key(claim_hash, tx_num, position), \ - cls.pack_value(root_tx_num, root_position, amount, activation, name) + cls.pack_value(root_tx_num, root_position, amount, name) class TXOToClaimPrefixRow(PrefixRow): @@ -260,18 +289,32 @@ class TXOToClaimPrefixRow(PrefixRow): cls.pack_value(claim_hash, name) +def shortid_key_helper(struct_fmt): + packer = struct.Struct(struct_fmt).pack + def wrapper(name, *args): + return length_encoded_name(name) + packer(*args) + return wrapper + + class ClaimShortIDPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_short_id_prefix.value key_struct = struct.Struct(b'>20sLH') - value_struct = struct.Struct(b'>LHL') + value_struct = struct.Struct(b'>LH') + key_part_lambdas = [ + lambda: b'', + length_encoded_name, + shortid_key_helper(b'>20s'), + shortid_key_helper(b'>20sL'), + shortid_key_helper(b'>20sLH'), + ] @classmethod def pack_key(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int): return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(claim_hash, root_tx_num, root_position) @classmethod - def pack_value(cls, tx_num: int, position: int, activation: int): - return super().pack_value(tx_num, position, activation) + def pack_value(cls, tx_num: int, position: int): + return super().pack_value(tx_num, position) @classmethod def unpack_key(cls, key: bytes) -> ClaimShortIDKey: @@ -286,9 +329,9 @@ class ClaimShortIDPrefixRow(PrefixRow): @classmethod def pack_item(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int, - tx_num: int, position: int, activation: int): + tx_num: int, position: int): return cls.pack_key(name, claim_hash, root_tx_num, root_position), \ - cls.pack_value(tx_num, position, activation) + cls.pack_value(tx_num, position) class ClaimToChannelPrefixRow(PrefixRow): @@ -317,15 +360,33 @@ class ClaimToChannelPrefixRow(PrefixRow): return cls.pack_key(claim_hash), cls.pack_value(signing_hash) +def channel_to_claim_helper(struct_fmt): + packer = struct.Struct(struct_fmt).pack + + def wrapper(signing_hash: bytes, name: str, *args): + return signing_hash + length_encoded_name(name) + packer(*args) + + return wrapper + + class ChannelToClaimPrefixRow(PrefixRow): prefix = DB_PREFIXES.channel_to_claim.value - key_struct = struct.Struct(b'>QLH') - value_struct = struct.Struct(b'>20sL') + key_struct = struct.Struct(b'>LH') + value_struct = struct.Struct(b'>20s') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack, + channel_to_claim_helper(b''), + channel_to_claim_helper(b'>s'), + channel_to_claim_helper(b'>L'), + channel_to_claim_helper(b'>LH'), + ] @classmethod - def pack_key(cls, signing_hash: bytes, name: str, effective_amount: int, tx_num: int, position: int): + def pack_key(cls, signing_hash: bytes, name: str, tx_num: int, position: int): return cls.prefix + signing_hash + length_encoded_name(name) + cls.key_struct.pack( - 0xffffffffffffffff - effective_amount, tx_num, position + tx_num, position ) @classmethod @@ -334,24 +395,24 @@ class ChannelToClaimPrefixRow(PrefixRow): signing_hash = key[1:21] name_len = int.from_bytes(key[21:23], byteorder='big') name = key[23:23 + name_len].decode() - ones_comp_effective_amount, tx_num, position = cls.key_struct.unpack(key[23 + name_len:]) + tx_num, position = cls.key_struct.unpack(key[23 + name_len:]) return ChannelToClaimKey( - signing_hash, name, 0xffffffffffffffff - ones_comp_effective_amount, tx_num, position + signing_hash, name, tx_num, position ) @classmethod - def pack_value(cls, claim_hash: bytes, claims_in_channel: int) -> bytes: - return super().pack_value(claim_hash, claims_in_channel) + def pack_value(cls, claim_hash: bytes) -> bytes: + return super().pack_value(claim_hash) @classmethod def unpack_value(cls, data: bytes) -> ChannelToClaimValue: return ChannelToClaimValue(*cls.value_struct.unpack(data)) @classmethod - def pack_item(cls, signing_hash: bytes, name: str, effective_amount: int, tx_num: int, position: int, - claim_hash: bytes, claims_in_channel: int): - return cls.pack_key(signing_hash, name, effective_amount, tx_num, position), \ - cls.pack_value(claim_hash, claims_in_channel) + def pack_item(cls, signing_hash: bytes, name: str, tx_num: int, position: int, + claim_hash: bytes): + return cls.pack_key(signing_hash, name, tx_num, position), \ + cls.pack_value(claim_hash) class ClaimToSupportPrefixRow(PrefixRow): @@ -412,6 +473,12 @@ class ClaimExpirationPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_expiration.value key_struct = struct.Struct(b'>LLH') value_struct = struct.Struct(b'>20s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack, + struct.Struct(b'>LL').pack, + struct.Struct(b'>LLH').pack, + ] @classmethod def pack_key(cls, expiration: int, tx_num: int, position: int) -> bytes: @@ -469,13 +536,20 @@ class ClaimTakeoverPrefixRow(PrefixRow): return cls.pack_key(name), cls.pack_value(claim_hash, takeover_height) -class PendingClaimActivationPrefixRow(PrefixRow): +class PendingActivationPrefixRow(PrefixRow): prefix = DB_PREFIXES.pending_activation.value - key_struct = struct.Struct(b'>LLH') + key_struct = struct.Struct(b'>LBLH') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack, + struct.Struct(b'>LB').pack, + struct.Struct(b'>LBL').pack, + struct.Struct(b'>LBLH').pack + ] @classmethod - def pack_key(cls, height: int, tx_num: int, position: int): - return super().pack_key(height, tx_num, position) + def pack_key(cls, height: int, txo_type: int, tx_num: int, position: int): + return super().pack_key(height, txo_type, tx_num, position) @classmethod def unpack_key(cls, key: bytes) -> PendingActivationKey: @@ -493,11 +567,47 @@ class PendingClaimActivationPrefixRow(PrefixRow): return PendingActivationValue(claim_hash, name) @classmethod - def pack_item(cls, height: int, tx_num: int, position: int, claim_hash: bytes, name: str): - return cls.pack_key(height, tx_num, position), \ + def pack_item(cls, height: int, txo_type: int, tx_num: int, position: int, claim_hash: bytes, name: str): + return cls.pack_key(height, txo_type, tx_num, position), \ cls.pack_value(claim_hash, name) +class ActivatedPrefixRow(PrefixRow): + prefix = DB_PREFIXES.activated_claim_and_support.value + key_struct = struct.Struct(b'>BLH') + value_struct = struct.Struct(b'>L20s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>B').pack, + struct.Struct(b'>BL').pack, + struct.Struct(b'>BLH').pack + ] + + @classmethod + def pack_key(cls, txo_type: int, tx_num: int, position: int): + return super().pack_key(txo_type, tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> ActivationKey: + return ActivationKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, height: int, claim_hash: bytes, name: str) -> bytes: + return cls.value_struct.pack(height, claim_hash) + length_encoded_name(name) + + @classmethod + def unpack_value(cls, data: bytes) -> ActivationValue: + height, claim_hash = cls.value_struct.unpack(data[:24]) + name_len = int.from_bytes(data[24:26], byteorder='big') + name = data[26:26 + name_len].decode() + return ActivationValue(height, claim_hash, name) + + @classmethod + def pack_item(cls, txo_type: int, tx_num: int, position: int, height: int, claim_hash: bytes, name: str): + return cls.pack_key(txo_type, tx_num, position), \ + cls.pack_value(height, claim_hash, name) + + class Prefixes: claim_to_support = ClaimToSupportPrefixRow support_to_claim = SupportToClaimPrefixRow @@ -509,10 +619,11 @@ class Prefixes: channel_to_claim = ChannelToClaimPrefixRow claim_short_id = ClaimShortIDPrefixRow - claim_effective_amount = EffectiveAmountPrefixRow claim_expiration = ClaimExpirationPrefixRow claim_takeover = ClaimTakeoverPrefixRow - pending_activation = PendingClaimActivationPrefixRow + pending_activation = PendingActivationPrefixRow + activated = ActivatedPrefixRow + active_amount = ActiveAmountPrefixRow # undo_claimtrie = b'M' diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index e081000a3..bfcdb4f5e 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -18,7 +18,7 @@ import struct import attr import zlib import base64 -from typing import Optional, Iterable +from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List from functools import partial from asyncio import sleep from bisect import bisect_right, bisect_left @@ -37,8 +37,9 @@ from lbry.wallet.server.storage import db_class from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue -from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name -from lbry.wallet.server.db.claimtrie import get_expiration_height, get_delay_for_name +from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE +from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue +from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, length_encoded_name from lbry.wallet.server.db.elasticsearch import SearchIndex @@ -56,17 +57,6 @@ TXO_STRUCT_unpack = TXO_STRUCT.unpack TXO_STRUCT_pack = TXO_STRUCT.pack -HISTORY_PREFIX = b'A' -TX_PREFIX = b'B' -BLOCK_HASH_PREFIX = b'C' -HEADER_PREFIX = b'H' -TX_NUM_PREFIX = b'N' -TX_COUNT_PREFIX = b'T' -UNDO_PREFIX = b'U' -TX_HASH_PREFIX = b'X' -HASHX_UTXO_PREFIX = b'h' -UTXO_PREFIX = b'u' -HASHX_HISTORY_PREFIX = b'x' @attr.s(slots=True) @@ -188,12 +178,22 @@ class LevelDB: # Search index self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout) - def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int): + def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]: claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx)) if not claim_hash_and_name: return return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name) + def get_activation(self, tx_num, position, is_support=False) -> int: + activation = self.db.get( + Prefixes.activated.pack_key( + ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position + ) + ) + if activation: + return Prefixes.activated.unpack_value(activation).height + return -1 + def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]: key = Prefixes.support_to_claim.pack_key(tx_num, position) supported_claim_hash = self.db.get(key) @@ -228,7 +228,7 @@ class LevelDB: created_height = bisect_right(self.tx_counts, root_tx_num) last_take_over_height = controlling_claim.height - expiration_height = get_expiration_height(height) + expiration_height = self.coin.get_expiration_height(height) support_amount = self.get_support_amount(claim_hash) claim_amount = self.get_claim_txo_amount(claim_hash, tx_num, position) @@ -239,7 +239,7 @@ class LevelDB: short_url = f'{name}#{claim_hash.hex()}' canonical_url = short_url if channel_hash: - channel_vals = self.get_root_claim_txo_and_current_amount(channel_hash) + channel_vals = self.get_claim_txo(channel_hash) if channel_vals: channel_name = channel_vals[1].name claims_in_channel = self.get_claims_in_channel_count(channel_hash) @@ -260,11 +260,13 @@ class LevelDB: :param claim_id: partial or complete claim id :param amount_order: '$' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided """ - if not amount_order and not claim_id: + if (not amount_order and not claim_id) or amount_order == 1: # winning resolution controlling = self.get_controlling_claim(normalized_name) if not controlling: + print("none controlling") return + print("resolved controlling", controlling.claim_hash.hex()) return self._fs_get_claim_by_hash(controlling.claim_hash) encoded_name = length_encoded_name(normalized_name) @@ -279,7 +281,7 @@ class LevelDB: claim_txo = Prefixes.claim_short_id.unpack_value(v) return self._prepare_resolve_result( claim_txo.tx_num, claim_txo.position, key.claim_hash, key.name, key.root_tx_num, - key.root_position, claim_txo.activation + key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position) ) return @@ -302,8 +304,9 @@ class LevelDB: for k, v in self.db.iterator(prefix=prefix): key = Prefixes.channel_to_claim.unpack_key(k) stream = Prefixes.channel_to_claim.unpack_value(v) - if not candidates or candidates[-1][-1] == key.effective_amount: - candidates.append((stream.claim_hash, key.tx_num, key.position, key.effective_amount)) + effective_amount = self.get_effective_amount(stream.claim_hash) + if not candidates or candidates[-1][-1] == effective_amount: + candidates.append((stream.claim_hash, key.tx_num, key.position, effective_amount)) else: break if not candidates: @@ -347,12 +350,13 @@ class LevelDB: return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url) def _fs_get_claim_by_hash(self, claim_hash): - for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash): + for k, v in self.db.iterator(prefix=Prefixes.claim_to_txo.pack_partial_key(claim_hash)): unpacked_k = Prefixes.claim_to_txo.unpack_key(k) unpacked_v = Prefixes.claim_to_txo.unpack_value(v) + activation_height = self.get_activation(unpacked_k.tx_num, unpacked_k.position) return self._prepare_resolve_result( unpacked_k.tx_num, unpacked_k.position, unpacked_k.claim_hash, unpacked_v.name, - unpacked_v.root_tx_num, unpacked_v.root_position, unpacked_v.activation + unpacked_v.root_tx_num, unpacked_v.root_position, activation_height ) async def fs_getclaimbyid(self, claim_id): @@ -360,19 +364,8 @@ class LevelDB: self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id) ) - def claim_exists(self, claim_hash: bytes): - for _ in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_value=False): - return True - return False - - def get_root_claim_txo_and_current_amount(self, claim_hash): - for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash): - unpacked_k = Prefixes.claim_to_txo.unpack_key(k) - unpacked_v = Prefixes.claim_to_txo.unpack_value(v) - return unpacked_k, unpacked_v - def make_staged_claim_item(self, claim_hash: bytes) -> Optional[StagedClaimtrieItem]: - claim_info = self.get_root_claim_txo_and_current_amount(claim_hash) + claim_info = self.get_claim_txo(claim_hash) k, v = claim_info root_tx_num = v.root_tx_num root_idx = v.root_position @@ -381,16 +374,14 @@ class LevelDB: tx_num = k.tx_num idx = k.position height = bisect_right(self.tx_counts, tx_num) - effective_amount = self.get_support_amount(claim_hash) + value signing_hash = self.get_channel_for_claim(claim_hash) - activation_height = v.activation - if signing_hash: - count = self.get_claims_in_channel_count(signing_hash) - else: - count = 0 + # if signing_hash: + # count = self.get_claims_in_channel_count(signing_hash) + # else: + # count = 0 return StagedClaimtrieItem( - name, claim_hash, value, effective_amount, activation_height, get_expiration_height(height), tx_num, idx, - root_tx_num, root_idx, signing_hash, count + name, claim_hash, value, self.coin.get_expiration_height(height), tx_num, idx, + root_tx_num, root_idx, signing_hash ) def get_claim_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]: @@ -398,58 +389,57 @@ class LevelDB: if v: return Prefixes.claim_to_txo.unpack_value(v).amount - def get_claim_from_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: + def get_support_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]: + v = self.db.get(Prefixes.claim_to_support.pack_key(claim_hash, tx_num, position)) + if v: + return Prefixes.claim_to_support.unpack_value(v).amount + + def get_claim_txo(self, claim_hash: bytes) -> Optional[Tuple[ClaimToTXOKey, ClaimToTXOValue]]: assert claim_hash - for v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_key=False): - return Prefixes.claim_to_txo.unpack_value(v) + for k, v in self.db.iterator(prefix=Prefixes.claim_to_txo.pack_partial_key(claim_hash)): + return Prefixes.claim_to_txo.unpack_key(k), Prefixes.claim_to_txo.unpack_value(v) - def get_claim_amount(self, claim_hash: bytes) -> Optional[int]: - claim = self.get_claim_from_txo(claim_hash) - if claim: - return claim.amount - - def get_effective_amount(self, claim_hash: bytes): - return (self.get_claim_amount(claim_hash) or 0) + self.get_support_amount(claim_hash) - - def get_update_effective_amount_ops(self, claim_hash: bytes, effective_amount: int): - claim_info = self.get_root_claim_txo_and_current_amount(claim_hash) - if not claim_info: - return [] - - root_tx_num = claim_info[1].root_tx_num - root_position = claim_info[1].root_position - amount = claim_info[1].amount - name = claim_info[1].name - tx_num = claim_info[0].tx_num - position = claim_info[0].position - activation = claim_info[1].activation - signing_hash = self.get_channel_for_claim(claim_hash) - claims_in_channel_count = None - if signing_hash: - claims_in_channel_count = self.get_claims_in_channel_count(signing_hash) - prev_effective_amount = self.get_effective_amount(claim_hash) - return get_update_effective_amount_ops( - name, effective_amount, prev_effective_amount, tx_num, position, - root_tx_num, root_position, claim_hash, activation, activation, signing_hash, - claims_in_channel_count + def _get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int: + return sum( + Prefixes.active_amount.unpack_value(v).amount + for v in self.db.iterator(start=Prefixes.active_amount.pack_partial_key( + claim_hash, txo_type, 0), stop=Prefixes.active_amount.pack_partial_key( + claim_hash, txo_type, height), include_key=False) ) + def get_effective_amount(self, claim_hash: bytes, support_only=False) -> int: + support_amount = self._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.db_height + 1) + if support_only: + return support_only + return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) + + def get_claims_for_name(self, name): + claims = [] + for _k, _v in self.db.iterator(prefix=Prefixes.claim_short_id.pack_partial_key(name)): + k, v = Prefixes.claim_short_id.unpack_key(_k), Prefixes.claim_short_id.unpack_value(_v) + # claims[v.claim_hash] = (k, v) + if k.claim_hash not in claims: + claims.append(k.claim_hash) + return claims + def get_claims_in_channel_count(self, channel_hash) -> int: - for v in self.db.iterator(prefix=DB_PREFIXES.channel_to_claim.value + channel_hash, include_key=False): - return Prefixes.channel_to_claim.unpack_value(v).claims_in_channel - return 0 + count = 0 + for _ in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash), include_key=False): + count += 1 + return count def get_channel_for_claim(self, claim_hash) -> Optional[bytes]: - return self.db.get(DB_PREFIXES.claim_to_channel.value + claim_hash) + return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash)) - def get_expired_by_height(self, height: int): + def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]: expired = {} - for _k, _v in self.db.iterator(prefix=DB_PREFIXES.claim_expiration.value + struct.pack(b'>L', height)): + for _k, _v in self.db.iterator(prefix=Prefixes.claim_expiration.pack_partial_key(height)): k, v = Prefixes.claim_expiration.unpack_item(_k, _v) tx_hash = self.total_transactions[k.tx_num] tx = self.coin.transaction(self.db.get(DB_PREFIXES.TX_PREFIX.value + tx_hash)) # treat it like a claim spend so it will delete/abandon properly # the _spend_claim function this result is fed to expects a txi, so make a mock one + print(f"\texpired lbry://{v.name} {v.claim_hash.hex()}") expired[v.claim_hash] = ( k.tx_num, k.position, v.name, TxInput(prev_hash=tx_hash, prev_idx=k.position, script=tx.outputs[k.position].pk_script, sequence=0) @@ -462,28 +452,21 @@ class LevelDB: return return Prefixes.claim_takeover.unpack_value(controlling) - def get_claims_for_name(self, name: str): - claim_hashes = set() - for k in self.db.iterator(prefix=Prefixes.claim_short_id.prefix + length_encoded_name(name), - include_value=False): - claim_hashes.add(Prefixes.claim_short_id.unpack_key(k).claim_hash) - return claim_hashes + def get_claim_txos_for_name(self, name: str): + txos = {} + for k, v in self.db.iterator(prefix=Prefixes.claim_short_id.pack_partial_key(name)): + claim_hash = Prefixes.claim_short_id.unpack_key(k).claim_hash + tx_num, nout = Prefixes.claim_short_id.unpack_value(v) + txos[claim_hash] = tx_num, nout + return txos - def get_activated_claims_at_height(self, height: int) -> typing.Set[PendingActivationValue]: - claims = set() - prefix = Prefixes.pending_activation.prefix + height.to_bytes(4, byteorder='big') - for _v in self.db.iterator(prefix=prefix, include_key=False): + def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: + activated = defaultdict(list) + for _k, _v in self.db.iterator(prefix=Prefixes.pending_activation.pack_partial_key(height)): + k = Prefixes.pending_activation.unpack_key(_k) v = Prefixes.pending_activation.unpack_value(_v) - claims.add(v) - return claims - - def get_activation_delay(self, claim_hash: bytes, name: str) -> int: - controlling = self.get_controlling_claim(name) - if not controlling: - return 0 - if claim_hash == controlling.claim_hash: - return 0 - return get_delay_for_name(self.db_height - controlling.height) + activated[v].append(k) + return activated async def _read_tx_counts(self): if self.tx_counts is not None: @@ -494,7 +477,7 @@ class LevelDB: def get_counts(): return tuple( util.unpack_be_uint64(tx_count) - for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + for tx_count in self.db.iterator(prefix=DB_PREFIXES.TX_COUNT_PREFIX.value, include_key=False) ) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) @@ -509,7 +492,7 @@ class LevelDB: async def _read_txids(self): def get_txids(): - return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) + return list(self.db.iterator(prefix=DB_PREFIXES.TX_HASH_PREFIX.value, include_key=False)) start = time.perf_counter() self.logger.info("loading txids") @@ -528,7 +511,7 @@ class LevelDB: def get_headers(): return [ - header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False) + header for header in self.db.iterator(prefix=DB_PREFIXES.HEADER_PREFIX.value, include_key=False) ] headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index 49eda2fe0..bbfcee30a 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -1,6 +1,7 @@ import asyncio import json import hashlib +from bisect import bisect_right from binascii import hexlify, unhexlify from lbry.testcase import CommandTestCase from lbry.wallet.transaction import Transaction, Output @@ -43,35 +44,52 @@ class BaseResolveTestCase(CommandTestCase): async def assertMatchClaim(self, claim_id): expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) - resolved, _ = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) - print(expected) - print(resolved) - self.assertDictEqual({ - 'claim_id': expected['claimId'], - 'activation_height': expected['validAtHeight'], - 'last_takeover_height': expected['lastTakeoverHeight'], - 'txid': expected['txId'], - 'nout': expected['n'], - 'amount': expected['amount'], - 'effective_amount': expected['effectiveAmount'] - }, { - 'claim_id': resolved.claim_hash.hex(), - 'activation_height': resolved.activation_height, - 'last_takeover_height': resolved.last_takeover_height, - 'txid': resolved.tx_hash[::-1].hex(), - 'nout': resolved.position, - 'amount': resolved.amount, - 'effective_amount': resolved.effective_amount - }) - return resolved + claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) + if not expected: + self.assertIsNone(claim) + return + self.assertEqual(expected['claimId'], claim.claim_hash.hex()) + self.assertEqual(expected['validAtHeight'], claim.activation_height) + self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height) + self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex()) + self.assertEqual(expected['n'], claim.position) + self.assertEqual(expected['amount'], claim.amount) + self.assertEqual(expected['effectiveAmount'], claim.effective_amount) + return claim async def assertMatchClaimIsWinning(self, name, claim_id): self.assertEqual(claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) await self.assertMatchClaim(claim_id) + await self.assertMatchClaimsForName(name) + + async def assertMatchClaimsForName(self, name): + expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name)) + + print(len(expected['claims']), 'from lbrycrd for ', name) + + db = self.conductor.spv_node.server.bp.db + + def check_supports(claim_id, lbrycrd_supports): + for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))): + support = lbrycrd_supports[i] + self.assertEqual(support['txId'], db.total_transactions[tx_num][::-1].hex()) + self.assertEqual(support['n'], position) + self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num)) + self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True)) + + # self.assertEqual(len(expected['claims']), len(db_claims.claims)) + # self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight) + + for c in expected['claims']: + check_supports(c['claimId'], c['supports']) + claim_hash = bytes.fromhex(c['claimId']) + self.assertEqual(c['validAtHeight'], db.get_activation( + db.total_transactions.index(bytes.fromhex(c['txId'])[::-1]), c['n'] + )) + self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash)) class ResolveCommand(BaseResolveTestCase): - async def test_resolve_response(self): channel_id = self.get_claim_id( await self.channel_create('@abc', '0.01') @@ -170,6 +188,7 @@ class ResolveCommand(BaseResolveTestCase): await self.stream_create('foo', '0.9', allow_duplicate_name=True)) # plain winning claim await self.assertResolvesToClaimId('foo', claim_id3) + # amount order resolution await self.assertResolvesToClaimId('foo$1', claim_id3) await self.assertResolvesToClaimId('foo$2', claim_id2) @@ -275,9 +294,7 @@ class ResolveCommand(BaseResolveTestCase): winner_id = self.get_claim_id(c) # winning_one = await self.check_lbrycrd_winning(one) - winning_two = await self.assertMatchWinningClaim(two) - - self.assertEqual(winner_id, winning_two.claim_hash.hex()) + await self.assertMatchClaimIsWinning(two, winner_id) r1 = await self.resolve(f'lbry://{one}') r2 = await self.resolve(f'lbry://{two}') @@ -385,24 +402,37 @@ class ResolveCommand(BaseResolveTestCase): class ResolveClaimTakeovers(BaseResolveTestCase): - async def test_activation_delay(self): + async def _test_activation_delay(self): name = 'derp' # initially claim the name - first_claim_id = (await self.stream_create(name, '0.1'))['outputs'][0]['claim_id'] - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + first_claim_id = (await self.stream_create(name, '0.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(320) # a claim of higher amount made now will have a takeover delay of 10 second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] # sanity check self.assertNotEqual(first_claim_id, second_claim_id) # takeover should not have happened yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(9) # not yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(1) # the new claim should have activated - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) + return first_claim_id, second_claim_id + + async def test_activation_delay(self): + await self._test_activation_delay() + + async def test_activation_delay_then_abandon_then_reclaim(self): + name = 'derp' + first_claim_id, second_claim_id = await self._test_activation_delay() + await self.daemon.jsonrpc_txo_spend(type='stream', claim_id=first_claim_id) + await self.daemon.jsonrpc_txo_spend(type='stream', claim_id=second_claim_id) + await self.generate(1) + await self.assertNoClaimForName(name) + await self._test_activation_delay() async def test_block_takeover_with_delay_1_support(self): name = 'derp' @@ -415,46 +445,46 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # sanity check self.assertNotEqual(first_claim_id, second_claim_id) # takeover should not have happened yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(8) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) # prevent the takeover by adding a support one block before the takeover happens await self.support_create(first_claim_id, bid='1.0') # one more block until activation await self.generate(1) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) async def test_block_takeover_with_delay_0_support(self): name = 'derp' # initially claim the name first_claim_id = (await self.stream_create(name, '0.1'))['outputs'][0]['claim_id'] - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(320) # a claim of higher amount made now will have a takeover delay of 10 second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] # sanity check - self.assertNotEqual(first_claim_id, second_claim_id) + await self.assertMatchClaimIsWinning(name, first_claim_id) # takeover should not have happened yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(9) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) # prevent the takeover by adding a support on the same block the takeover would happen await self.support_create(first_claim_id, bid='1.0') - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) async def _test_almost_prevent_takeover(self, name: str, blocks: int = 9): # initially claim the name first_claim_id = (await self.stream_create(name, '0.1'))['outputs'][0]['claim_id'] - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(320) # a claim of higher amount made now will have a takeover delay of 10 second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] # sanity check self.assertNotEqual(first_claim_id, second_claim_id) # takeover should not have happened yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(blocks) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) # prevent the takeover by adding a support on the same block the takeover would happen tx = await self.daemon.jsonrpc_support_create(first_claim_id, '1.0') await self.ledger.wait(tx) @@ -465,7 +495,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): first_claim_id, second_claim_id, tx = await self._test_almost_prevent_takeover(name, 9) await self.daemon.jsonrpc_txo_spend(type='support', txid=tx.id) await self.generate(1) - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) async def test_almost_prevent_takeover_remove_support_one_block_after_supported(self): name = 'derp' @@ -473,35 +503,35 @@ class ResolveClaimTakeovers(BaseResolveTestCase): await self.generate(1) await self.daemon.jsonrpc_txo_spend(type='support', txid=tx.id) await self.generate(1) - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) async def test_abandon_before_takeover(self): name = 'derp' # initially claim the name first_claim_id = (await self.stream_create(name, '0.1'))['outputs'][0]['claim_id'] - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(320) # a claim of higher amount made now will have a takeover delay of 10 second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] # sanity check self.assertNotEqual(first_claim_id, second_claim_id) # takeover should not have happened yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(8) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) # abandon the winning claim await self.daemon.jsonrpc_txo_spend(type='stream', claim_id=first_claim_id) await self.generate(1) # the takeover and activation should happen a block earlier than they would have absent the abandon - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) await self.generate(1) - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) async def test_abandon_before_takeover_no_delay_update(self): # TODO: fix race condition line 506 name = 'derp' # initially claim the name first_claim_id = (await self.stream_create(name, '0.1'))['outputs'][0]['claim_id'] - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(320) # block 527 # a claim of higher amount made now will have a takeover delay of 10 @@ -510,19 +540,23 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # sanity check self.assertNotEqual(first_claim_id, second_claim_id) # takeover should not have happened yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.assertMatchClaimsForName(name) await self.generate(8) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.assertMatchClaimsForName(name) # abandon the winning claim await self.daemon.jsonrpc_txo_spend(type='stream', claim_id=first_claim_id) await self.daemon.jsonrpc_stream_update(second_claim_id, '0.1') await self.generate(1) # the takeover and activation should happen a block earlier than they would have absent the abandon - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) + await self.assertMatchClaimsForName(name) await self.generate(1) # await self.ledger.on_header.where(lambda e: e.height == 537) - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) + await self.assertMatchClaimsForName(name) async def test_abandon_controlling_support_before_pending_takeover(self): name = 'derp' @@ -533,54 +567,78 @@ class ResolveClaimTakeovers(BaseResolveTestCase): self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) await self.generate(321) - second_claim_id = (await self.stream_create(name, '1.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] + second_claim_id = (await self.stream_create(name, '0.9', allow_duplicate_name=True))['outputs'][0]['claim_id'] + self.assertNotEqual(first_claim_id, second_claim_id) # takeover should not have happened yet - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(8) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) # abandon the support that causes the winning claim to have the highest staked tx = await self.daemon.jsonrpc_txo_spend(type='support', txid=controlling_support_tx.id) await self.generate(1) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) + # await self.assertMatchClaim(second_claim_id) + await self.generate(1) - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + + await self.assertMatchClaim(first_claim_id) + await self.assertMatchClaimIsWinning(name, second_claim_id) async def test_remove_controlling_support(self): name = 'derp' # initially claim the name - first_claim_id = (await self.stream_create(name, '0.1'))['outputs'][0]['claim_id'] + first_claim_id = (await self.stream_create(name, '0.2'))['outputs'][0]['claim_id'] first_support_tx = await self.daemon.jsonrpc_support_create(first_claim_id, '0.9') await self.ledger.wait(first_support_tx) - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, first_claim_id) - await self.generate(321) # give the first claim long enough for a 10 block takeover delay + await self.generate(320) # give the first claim long enough for a 10 block takeover delay + await self.assertMatchClaimIsWinning(name, first_claim_id) # make a second claim which will take over the name - second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] - second_claim_support_tx = await self.daemon.jsonrpc_support_create(second_claim_id, '1.0') - await self.ledger.wait(second_claim_support_tx) + second_claim_id = (await self.stream_create(name, '0.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] self.assertNotEqual(first_claim_id, second_claim_id) + second_claim_support_tx = await self.daemon.jsonrpc_support_create(second_claim_id, '1.5') + await self.ledger.wait(second_claim_support_tx) + await self.generate(1) # neither the second claim or its support have activated yet + await self.assertMatchClaimIsWinning(name, first_claim_id) - # the name resolves to the first claim - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) - await self.generate(9) - # still resolves to the first claim - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) - await self.generate(1) # second claim takes over - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) - await self.generate(33) # give the second claim long enough for a 1 block takeover delay - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) - # abandon the support that causes the winning claim to have the highest staked - await self.daemon.jsonrpc_txo_spend(type='support', txid=second_claim_support_tx.id) + await self.generate(9) # claim activates, but is not yet winning + await self.assertMatchClaimIsWinning(name, first_claim_id) + + await self.generate(1) # support activates, takeover happens + await self.assertMatchClaimIsWinning(name, second_claim_id) + + await self.daemon.jsonrpc_txo_spend(type='support', claim_id=second_claim_id, blocking=True) + await self.generate(1) # support activates, takeover happens + await self.assertMatchClaimIsWinning(name, first_claim_id) + + async def test_claim_expiration(self): + name = 'derp' + # starts at height 206 + vanishing_claim = (await self.stream_create('vanish', '0.1'))['outputs'][0]['claim_id'] + + await self.generate(493) + # in block 701 and 702 + first_claim_id = (await self.stream_create(name, '0.3'))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning('vanish', vanishing_claim) + await self.generate(100) # block 801, expiration fork happened + await self.assertNoClaimForName('vanish') + # second claim is in block 802 + second_claim_id = (await self.stream_create(name, '0.2', allow_duplicate_name=True))['outputs'][0]['claim_id'] + await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.generate(498) + await self.assertMatchClaimIsWinning(name, first_claim_id) await self.generate(1) - self.assertEqual(second_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) - await self.generate(1) # first claim takes over - self.assertEqual(first_claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) + await self.assertMatchClaimIsWinning(name, second_claim_id) + await self.generate(100) + await self.assertMatchClaimIsWinning(name, second_claim_id) + await self.generate(1) + await self.assertNoClaimForName(name) class ResolveAfterReorg(BaseResolveTestCase): - async def reorg(self, start): blocks = self.ledger.headers.height - start self.blockchain.block_expected = start - 1