From 234c03db09cd73b8eff7c1c91f89f40c208ddd5c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 12 Aug 2021 16:08:52 -0400 Subject: [PATCH] fix claims not having non-normalized names --- lbry/wallet/server/block_processor.py | 96 ++++++++++--------- lbry/wallet/server/db/claimtrie.py | 15 +-- lbry/wallet/server/db/common.py | 1 + .../server/db/elasticsearch/constants.py | 11 ++- lbry/wallet/server/db/elasticsearch/search.py | 8 +- lbry/wallet/server/db/prefixes.py | 46 ++++++--- lbry/wallet/server/leveldb.py | 58 +++++------ .../blockchain/test_resolve_command.py | 32 +++++-- 8 files changed, 157 insertions(+), 110 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 4eb99fb8f..a2c9045a1 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -407,10 +407,11 @@ class BlockProcessor: def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]): + claim_name = txo.script.values['claim_name'].decode() try: - claim_name = txo.normalized_name + normalized_name = txo.normalized_name except UnicodeDecodeError: - claim_name = ''.join(chr(c) for c in txo.script.values['claim_name']) + normalized_name = claim_name if txo.script.is_claim_name: claim_hash = hash160(tx_hash + pack('>I', nout))[::-1] # print(f"\tnew {claim_hash.hex()} ({tx_num} {txo.amount})") @@ -478,7 +479,7 @@ class BlockProcessor: if claim_hash not in spent_claims: # print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}") return - if claim_name != spent_claims[claim_hash][2]: + if normalized_name != spent_claims[claim_hash][2]: self.logger.warning( f"{tx_hash[::-1].hex()} contains mismatched name for claim update {claim_hash.hex()}" ) @@ -493,9 +494,10 @@ class BlockProcessor: previous_claim = self._make_pending_claim_txo(claim_hash) root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position activation = self.db.get_activation(prev_tx_num, prev_idx) + claim_name = previous_claim.name self.db_op_stack.extend_ops( StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, claim_name, + ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, normalized_name, previous_claim.amount ).get_remove_activate_ops() ) @@ -506,8 +508,8 @@ class BlockProcessor: self.db.txo_to_claim[(tx_num, nout)] = claim_hash pending = StagedClaimtrieItem( - claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num, - root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash + claim_name, normalized_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, + root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash ) self.txo_to_claim[(tx_num, nout)] = pending self.claim_hash_to_txo[claim_hash] = (tx_num, nout) @@ -575,7 +577,7 @@ class BlockProcessor: self.pending_reposted.add(spent.reposted_claim_hash) if spent.signing_hash and spent.channel_signature_is_valid: self.pending_channel_counts[spent.signing_hash] -= 1 - spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name) + spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name) # print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops()) return True @@ -584,14 +586,14 @@ class BlockProcessor: if not self._spend_claim_txo(txin, spent_claims): self._spend_support_txo(txin) - def _abandon_claim(self, claim_hash, tx_num, nout, name): + def _abandon_claim(self, claim_hash: bytes, tx_num: int, nout: int, normalized_name: str): if (tx_num, nout) in self.txo_to_claim: pending = self.txo_to_claim.pop((tx_num, nout)) self.claim_hash_to_txo.pop(claim_hash) self.abandoned_claims[pending.claim_hash] = pending claim_root_tx_num, claim_root_idx = pending.root_tx_num, pending.root_position prev_amount, prev_signing_hash = pending.amount, pending.signing_hash - reposted_claim_hash = pending.reposted_claim_hash + reposted_claim_hash, name = pending.reposted_claim_hash, pending.name expiration = self.coin.get_expiration_height(self.height) signature_is_valid = pending.channel_signature_is_valid else: @@ -599,12 +601,12 @@ class BlockProcessor: claim_hash ) claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount - signature_is_valid = v.channel_signature_is_valid + signature_is_valid, name = v.channel_signature_is_valid, v.name prev_signing_hash = self.db.get_channel_for_claim(claim_hash, tx_num, nout) reposted_claim_hash = self.db.get_repost(claim_hash) expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num)) self.abandoned_claims[claim_hash] = staged = StagedClaimtrieItem( - name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num, + name, normalized_name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num, claim_root_idx, signature_is_valid, prev_signing_hash, reposted_claim_hash ) if prev_signing_hash and prev_signing_hash in self.pending_channel_counts: @@ -614,8 +616,7 @@ class BlockProcessor: self.support_txo_to_claim.pop(support_txo_to_clear) self.support_txos_by_claim[claim_hash].clear() self.support_txos_by_claim.pop(claim_hash) - - if name.startswith('@'): # abandon a channel, invalidate signatures + if normalized_name.startswith('@'): # abandon a channel, invalidate signatures self._invalidate_channel_signatures(claim_hash) def _invalidate_channel_signatures(self, claim_hash: bytes): @@ -660,7 +661,7 @@ class BlockProcessor: signing_hash = self.db.get_channel_for_claim(claim_hash, claim.tx_num, claim.position) reposted_claim_hash = self.db.get_repost(claim_hash) return StagedClaimtrieItem( - claim.name, claim_hash, claim.amount, + claim.name, claim.normalized_name, claim_hash, claim.amount, self.coin.get_expiration_height( bisect_right(self.db.tx_counts, claim.tx_num), extended=self.height >= self.coin.nExtendedClaimExpirationForkHeight @@ -680,19 +681,19 @@ class BlockProcessor: # abandon the channels last to handle abandoned signed claims in the same tx, # see test_abandon_channel_and_claims_in_same_tx expired_channels = {} - for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items(): - self._abandon_claim(abandoned_claim_hash, tx_num, nout, name) + for abandoned_claim_hash, (tx_num, nout, normalized_name) in spent_claims.items(): + self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) - if name.startswith('@'): - expired_channels[abandoned_claim_hash] = (tx_num, nout, name) + if normalized_name.startswith('@'): + expired_channels[abandoned_claim_hash] = (tx_num, nout, normalized_name) else: # print(f"\texpire {abandoned_claim_hash.hex()} {tx_num} {nout}") - self._abandon_claim(abandoned_claim_hash, tx_num, nout, name) + self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) # do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned - for abandoned_claim_hash, (tx_num, nout, name) in expired_channels.items(): + for abandoned_claim_hash, (tx_num, nout, normalized_name) in expired_channels.items(): # print(f"\texpire {abandoned_claim_hash.hex()} {tx_num} {nout}") - self._abandon_claim(abandoned_claim_hash, tx_num, nout, name) + self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) def _cached_get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int: if (claim_hash, txo_type, height) in self.amount_cache: @@ -717,10 +718,10 @@ class BlockProcessor: def _get_pending_claim_name(self, claim_hash: bytes) -> Optional[str]: assert claim_hash is not None if claim_hash in self.claim_hash_to_txo: - return self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].name + return self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].normalized_name claim_info = self.db.get_claim_txo(claim_hash) if claim_info: - return claim_info.name + return claim_info.normalized_name def _get_pending_supported_amount(self, claim_hash: bytes, height: Optional[int] = None) -> int: amount = self._cached_get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, height or (self.height + 1)) @@ -799,9 +800,9 @@ class BlockProcessor: # determine names needing takeover/deletion due to controlling claims being abandoned # and add ops to deactivate abandoned claims for claim_hash, staged in self.abandoned_claims.items(): - controlling = get_controlling(staged.name) + controlling = get_controlling(staged.normalized_name) if controlling and controlling.claim_hash == claim_hash: - names_with_abandoned_controlling_claims.append(staged.name) + names_with_abandoned_controlling_claims.append(staged.normalized_name) # print(f"\t{staged.name} needs takeover") activation = self.db.get_activation(staged.tx_num, staged.position) if activation > 0: # db returns -1 for non-existent txos @@ -809,7 +810,7 @@ class BlockProcessor: self.db_op_stack.extend_ops( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, - activation, staged.name, staged.amount + activation, staged.normalized_name, staged.amount ).get_remove_activate_ops() ) else: @@ -830,7 +831,8 @@ class BlockProcessor: # prepare to activate or delay activation of the pending claims being added this block for (tx_num, nout), staged in self.txo_to_claim.items(): self.db_op_stack.extend_ops(get_delayed_activate_ops( - staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False + staged.normalized_name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, + is_support=False )) # and the supports @@ -838,7 +840,7 @@ class BlockProcessor: if claim_hash in self.abandoned_claims: continue elif claim_hash in self.claim_hash_to_txo: - name = self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].name + name = self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].normalized_name staged_is_new_claim = not self.txo_to_claim[self.claim_hash_to_txo[claim_hash]].is_update else: supported_claim_info = self.db.get_claim_txo(claim_hash) @@ -847,7 +849,7 @@ class BlockProcessor: continue else: v = supported_claim_info - name = v.name + name = v.normalized_name staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position) self.db_op_stack.extend_ops(get_delayed_activate_ops( name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True @@ -855,7 +857,7 @@ class BlockProcessor: # add the activation/delayed-activation ops for activated, activated_txos in activated_at_height.items(): - controlling = get_controlling(activated.name) + controlling = get_controlling(activated.normalized_name) if activated.claim_hash in self.abandoned_claims: continue reactivate = False @@ -864,7 +866,7 @@ class BlockProcessor: reactivate = True for activated_txo in activated_txos: if activated_txo.is_support and (activated_txo.tx_num, activated_txo.position) in \ - self.removed_support_txos_by_name_by_claim[activated.name][activated.claim_hash]: + self.removed_support_txos_by_name_by_claim[activated.normalized_name][activated.claim_hash]: # print("\tskip activate support for pending abandoned claim") continue if activated_txo.is_claim: @@ -876,7 +878,7 @@ class BlockProcessor: amount = self.db.get_claim_txo_amount( activated.claim_hash ) - self.activated_claim_amount_by_name_and_hash[(activated.name, activated.claim_hash)] = amount + self.activated_claim_amount_by_name_and_hash[(activated.normalized_name, activated.claim_hash)] = amount else: txo_type = ACTIVATED_SUPPORT_TXO_TYPE txo_tup = (activated_txo.tx_num, activated_txo.position) @@ -890,7 +892,7 @@ class BlockProcessor: # print("\tskip activate support for non existent claim") continue self.activated_support_amount_by_claim[activated.claim_hash].append(amount) - self.activation_by_claim_by_name[activated.name][activated.claim_hash].append((activated_txo, amount)) + self.activation_by_claim_by_name[activated.normalized_name][activated.claim_hash].append((activated_txo, amount)) # print(f"\tactivate {'support' if txo_type == ACTIVATED_SUPPORT_TXO_TYPE else 'claim'} " # f"{activated.claim_hash.hex()} @ {activated_txo.height}") @@ -933,14 +935,14 @@ class BlockProcessor: for activated, activated_claim_txo in self.db.get_future_activated(height): # uses the pending effective amount for the future activation height, not the current height future_amount = self._get_pending_claim_amount( - activated.name, activated.claim_hash, activated_claim_txo.height + 1 + activated.normalized_name, activated.claim_hash, activated_claim_txo.height + 1 ) if activated.claim_hash not in claim_exists: claim_exists[activated.claim_hash] = activated.claim_hash in self.claim_hash_to_txo or ( self.db.get_claim_txo(activated.claim_hash) is not None) if claim_exists[activated.claim_hash] and activated.claim_hash not in self.abandoned_claims: v = future_amount, activated, activated_claim_txo - future_activations[activated.name][activated.claim_hash] = v + future_activations[activated.normalized_name][activated.claim_hash] = v for name, future_activated in activate_in_future.items(): for claim_hash, activated in future_activated.items(): @@ -1115,17 +1117,17 @@ class BlockProcessor: removed_claim = self.db.get_claim_txo(removed) if removed_claim: amt = self.db.get_url_effective_amount( - removed_claim.name, removed + removed_claim.normalized_name, removed ) if amt: self.db_op_stack.extend_ops(get_remove_effective_amount_ops( - removed_claim.name, amt.effective_amount, amt.tx_num, + removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position, removed )) for touched in self.touched_claim_hashes: if touched in self.claim_hash_to_txo: pending = self.txo_to_claim[self.claim_hash_to_txo[touched]] - name, tx_num, position = pending.name, pending.tx_num, pending.position + name, tx_num, position = pending.normalized_name, pending.tx_num, pending.position claim_from_db = self.db.get_claim_txo(touched) if claim_from_db: claim_amount_info = self.db.get_url_effective_amount(name, touched) @@ -1138,7 +1140,7 @@ class BlockProcessor: v = self.db.get_claim_txo(touched) if not v: continue - name, tx_num, position = v.name, v.tx_num, v.position + name, tx_num, position = v.normalized_name, v.tx_num, v.position amt = self.db.get_url_effective_amount(name, touched) if amt: self.db_op_stack.extend_ops(get_remove_effective_amount_ops( @@ -1215,16 +1217,16 @@ class BlockProcessor: abandoned_channels = {} # abandon the channels last to handle abandoned signed claims in the same tx, # see test_abandon_channel_and_claims_in_same_tx - for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items(): - if name.startswith('@'): - abandoned_channels[abandoned_claim_hash] = (tx_num, nout, name) + for abandoned_claim_hash, (tx_num, nout, normalized_name) in spent_claims.items(): + if normalized_name.startswith('@'): + abandoned_channels[abandoned_claim_hash] = (tx_num, nout, normalized_name) else: - # print(f"\tabandon {name} {abandoned_claim_hash.hex()} {tx_num} {nout}") - self._abandon_claim(abandoned_claim_hash, tx_num, nout, name) + # print(f"\tabandon {normalized_name} {abandoned_claim_hash.hex()} {tx_num} {nout}") + self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) - for abandoned_claim_hash, (tx_num, nout, name) in abandoned_channels.items(): - # print(f"\tabandon {name} {abandoned_claim_hash.hex()} {tx_num} {nout}") - self._abandon_claim(abandoned_claim_hash, tx_num, nout, name) + for abandoned_claim_hash, (tx_num, nout, normalized_name) in abandoned_channels.items(): + # print(f"\tabandon {normalized_name} {abandoned_claim_hash.hex()} {tx_num} {nout}") + self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) self.db.total_transactions.append(tx_hash) self.db.transaction_num_mapping[tx_hash] = tx_count diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 4c688ada8..54a65484d 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -128,6 +128,7 @@ def get_add_effective_amount_ops(name: str, effective_amount: int, tx_num: int, class StagedClaimtrieItem(typing.NamedTuple): name: str + normalized_name: str claim_hash: bytes amount: int expiration_height: int @@ -161,13 +162,13 @@ class StagedClaimtrieItem(typing.NamedTuple): ), # claim hash by txo op( - *Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.name) + *Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.normalized_name) ), # claim expiration op( *Prefixes.claim_expiration.pack_item( self.expiration_height, self.tx_num, self.position, self.claim_hash, - self.name + self.normalized_name ) ), # short url resolution @@ -175,7 +176,7 @@ class StagedClaimtrieItem(typing.NamedTuple): ops.extend([ op( *Prefixes.claim_short_id.pack_item( - self.name, self.claim_hash.hex()[:prefix_len + 1], self.root_tx_num, self.root_position, + self.normalized_name, self.claim_hash.hex()[:prefix_len + 1], self.root_tx_num, self.root_position, self.tx_num, self.position ) ) for prefix_len in range(10) @@ -192,7 +193,7 @@ class StagedClaimtrieItem(typing.NamedTuple): # stream by channel op( *Prefixes.channel_to_claim.pack_item( - self.signing_hash, self.name, self.tx_num, self.position, self.claim_hash + self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash ) ) ]) @@ -231,7 +232,7 @@ class StagedClaimtrieItem(typing.NamedTuple): # delete channel_to_claim/claim_to_channel RevertableDelete( *Prefixes.channel_to_claim.pack_item( - self.signing_hash, self.name, self.tx_num, self.position, self.claim_hash + self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash ) ), # update claim_to_txo with channel_signature_is_valid=False @@ -252,6 +253,6 @@ class StagedClaimtrieItem(typing.NamedTuple): def invalidate_signature(self) -> 'StagedClaimtrieItem': return StagedClaimtrieItem( - self.name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, self.position, - self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash + self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, + self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash ) diff --git a/lbry/wallet/server/db/common.py b/lbry/wallet/server/db/common.py index 53a265363..dce98711d 100644 --- a/lbry/wallet/server/db/common.py +++ b/lbry/wallet/server/db/common.py @@ -424,6 +424,7 @@ INDEXED_LANGUAGES = [ class ResolveResult(typing.NamedTuple): name: str + normalized_name: str claim_hash: bytes tx_num: int position: int diff --git a/lbry/wallet/server/db/elasticsearch/constants.py b/lbry/wallet/server/db/elasticsearch/constants.py index f20cf822f..a210af46d 100644 --- a/lbry/wallet/server/db/elasticsearch/constants.py +++ b/lbry/wallet/server/db/elasticsearch/constants.py @@ -38,7 +38,7 @@ INDEX_DEFAULT_SETTINGS = { FIELDS = { '_id', - 'claim_id', 'claim_type', 'claim_name', 'normalized_name', + 'claim_id', 'claim_type', 'name', 'normalized', 'tx_id', 'tx_nout', 'tx_position', 'short_url', 'canonical_url', 'is_controlling', 'last_take_over_height', @@ -56,9 +56,10 @@ FIELDS = { 'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'tx_num' } -TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', 'censoring_channel_id', - 'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature', - 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'} +TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id', + 'media_type', 'normalized', 'public_key_bytes', 'public_key_id', 'short_url', 'signature', + 'name', 'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', + 'tags'} RANGE_FIELDS = { 'height', 'creation_height', 'activation_height', 'expiration_height', @@ -72,7 +73,7 @@ RANGE_FIELDS = { ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS REPLACEMENTS = { - 'name': 'normalized_name', + # 'name': 'normalized_name', 'txid': 'tx_id', 'nout': 'tx_nout', 'valid_channel_signature': 'is_signature_valid', diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 4a8f309d8..0379ec090 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -205,7 +205,8 @@ class SearchIndex: total_referenced.extend(response) response = [ ResolveResult( - name=r['claim_name'], + name=r['name'], + normalized_name=r['normalized'], claim_hash=r['claim_hash'], tx_num=r['tx_num'], position=r['tx_nout'], @@ -230,7 +231,8 @@ class SearchIndex: ] extra = [ ResolveResult( - name=r['claim_name'], + name=r['name'], + normalized_name=r['normalized'], claim_hash=r['claim_hash'], tx_num=r['tx_num'], position=r['tx_nout'], @@ -647,7 +649,7 @@ def expand_result(results): result['tx_hash'] = unhexlify(result['tx_id'])[::-1] result['reposted'] = result.pop('repost_count') result['signature_valid'] = result.pop('is_signature_valid') - result['normalized'] = result.pop('normalized_name') + # result['normalized'] = result.pop('normalized_name') # if result['censoring_channel_hash']: # result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1] expanded.append(result) diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 3b9108da4..8ed55e96f 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -4,7 +4,7 @@ import array import base64 from typing import Union, Tuple, NamedTuple from lbry.wallet.server.db import DB_PREFIXES - +from lbry.schema.url import normalize_name ACTIVATED_CLAIM_TXO_TYPE = 1 ACTIVATED_SUPPORT_TXO_TYPE = 2 @@ -19,7 +19,18 @@ def length_prefix(key: str) -> bytes: return len(key).to_bytes(1, byteorder='big') + key.encode() -class PrefixRow: +_ROW_TYPES = {} + + +class PrefixRowType(type): + def __new__(cls, name, bases, kwargs): + klass = super().__new__(cls, name, bases, kwargs) + if name != "PrefixRow": + _ROW_TYPES[klass.prefix] = klass + return klass + + +class PrefixRow(metaclass=PrefixRowType): prefix: bytes key_struct: struct.Struct value_struct: struct.Struct @@ -175,6 +186,13 @@ class ClaimToTXOValue(typing.NamedTuple): channel_signature_is_valid: bool name: str + @property + def normalized_name(self) -> str: + try: + return normalize_name(self.name) + except UnicodeDecodeError: + return self.name + class TXOToClaimKey(typing.NamedTuple): tx_num: int @@ -190,13 +208,14 @@ class TXOToClaimValue(typing.NamedTuple): class ClaimShortIDKey(typing.NamedTuple): - name: str + normalized_name: str partial_claim_id: str root_tx_num: int root_position: int def __str__(self): - return f"{self.__class__.__name__}(name={self.name}, partial_claim_id={self.partial_claim_id}, " \ + return f"{self.__class__.__name__}(normalized_name={self.normalized_name}, " \ + f"partial_claim_id={self.partial_claim_id}, " \ f"root_tx_num={self.root_tx_num}, root_position={self.root_position})" @@ -274,14 +293,14 @@ class ClaimExpirationKey(typing.NamedTuple): class ClaimExpirationValue(typing.NamedTuple): claim_hash: bytes - name: str + normalized_name: str def __str__(self): - return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, name={self.name})" + return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, normalized_name={self.normalized_name})" class ClaimTakeoverKey(typing.NamedTuple): - name: str + normalized_name: str class ClaimTakeoverValue(typing.NamedTuple): @@ -309,10 +328,10 @@ class PendingActivationKey(typing.NamedTuple): class PendingActivationValue(typing.NamedTuple): claim_hash: bytes - name: str + normalized_name: str def __str__(self): - return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, name={self.name})" + return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, normalized_name={self.normalized_name})" class ActivationKey(typing.NamedTuple): @@ -324,10 +343,11 @@ class ActivationKey(typing.NamedTuple): class ActivationValue(typing.NamedTuple): height: int claim_hash: bytes - name: str + normalized_name: str def __str__(self): - return f"{self.__class__.__name__}(height={self.height}, claim_hash={self.claim_hash.hex()}, name={self.name})" + return f"{self.__class__.__name__}(height={self.height}, claim_hash={self.claim_hash.hex()}, " \ + f"normalized_name={self.normalized_name})" class ActiveAmountKey(typing.NamedTuple): @@ -347,7 +367,7 @@ class ActiveAmountValue(typing.NamedTuple): class EffectiveAmountKey(typing.NamedTuple): - name: str + normalized_name: str effective_amount: int tx_num: int position: int @@ -1345,6 +1365,6 @@ ROW_TYPES = { def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: try: - return ROW_TYPES[key[:1]].unpack_item(key, value) + return _ROW_TYPES[key[:1]].unpack_item(key, value) except KeyError: return key, value diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index f61bdb915..88c6e8bae 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -27,7 +27,7 @@ from collections import defaultdict, OrderedDict from lbry.error import ResolveCensoredError from lbry.schema.result import Censor from lbry.utils import LRUCacheWithMetrics -from lbry.schema.url import URL +from lbry.schema.url import URL, normalize_name from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str from lbry.wallet.server.tx import TxInput @@ -218,10 +218,11 @@ class LevelDB: supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount)) return supports - def get_short_claim_id_url(self, name: str, claim_hash: bytes, root_tx_num: int, root_position: int) -> str: + def get_short_claim_id_url(self, name: str, normalized_name: str, claim_hash: bytes, + root_tx_num: int, root_position: int) -> str: claim_id = claim_hash.hex() for prefix_len in range(10): - prefix = Prefixes.claim_short_id.pack_partial_key(name, claim_id[:prefix_len+1]) + prefix = Prefixes.claim_short_id.pack_partial_key(normalized_name, claim_id[:prefix_len+1]) for _k in self.db.iterator(prefix=prefix, include_value=False): k = Prefixes.claim_short_id.unpack_key(_k) if k.root_tx_num == root_tx_num and k.root_position == root_position: @@ -230,9 +231,14 @@ class LevelDB: print(f"{claim_id} has a collision") return f'{name}#{claim_id}' - def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, name: str, root_tx_num: int, - root_position: int, activation_height: int, signature_valid: bool) -> ResolveResult: - controlling_claim = self.get_controlling_claim(name) + def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, name: str, + root_tx_num: int, root_position: int, activation_height: int, + signature_valid: bool) -> ResolveResult: + try: + normalized_name = normalize_name(name) + except UnicodeDecodeError: + normalized_name = name + controlling_claim = self.get_controlling_claim(normalized_name) tx_hash = self.total_transactions[tx_num] height = bisect_right(self.tx_counts, tx_num) @@ -246,18 +252,19 @@ class LevelDB: effective_amount = support_amount + claim_amount channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) reposted_claim_hash = self.get_repost(claim_hash) - short_url = self.get_short_claim_id_url(name, claim_hash, root_tx_num, root_position) + short_url = self.get_short_claim_id_url(name, normalized_name, claim_hash, root_tx_num, root_position) canonical_url = short_url claims_in_channel = self.get_claims_in_channel_count(claim_hash) if channel_hash: channel_vals = self.claim_to_txo.get(channel_hash) if channel_vals: channel_short_url = self.get_short_claim_id_url( - channel_vals.name, channel_hash, channel_vals.root_tx_num, channel_vals.root_position + channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num, + channel_vals.root_position ) canonical_url = f'{channel_short_url}/{short_url}' return ResolveResult( - name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url, + name, normalized_name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url, is_controlling=controlling_claim.claim_hash == claim_hash, canonical_url=canonical_url, last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel, creation_height=created_height, activation_height=activation_height, @@ -288,7 +295,7 @@ class LevelDB: if claim_id: if len(claim_id) == 40: # a full claim id claim_txo = self.get_claim_txo(bytes.fromhex(claim_id)) - if not claim_txo or normalized_name != claim_txo.name: + if not claim_txo or normalized_name != claim_txo.normalized_name: return return self._prepare_resolve_result( claim_txo.tx_num, claim_txo.position, bytes.fromhex(claim_id), claim_txo.name, @@ -303,7 +310,7 @@ class LevelDB: claim_hash = self.txo_to_claim[(claim_txo.tx_num, claim_txo.position)] signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid return self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, key.name, key.root_tx_num, + claim_txo.tx_num, claim_txo.position, claim_hash, key.normalized_name, key.root_tx_num, key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position), signature_is_valid ) @@ -319,7 +326,7 @@ class LevelDB: claim_txo = self.claim_to_txo.get(claim_val.claim_hash) activation = self.get_activation(key.tx_num, key.position) return self._prepare_resolve_result( - key.tx_num, key.position, claim_val.claim_hash, key.name, claim_txo.root_tx_num, + key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num, claim_txo.root_position, activation, claim_txo.channel_signature_is_valid ) return @@ -472,7 +479,7 @@ class LevelDB: reposts = self.get_reposts_in_channel(reposter_channel_hash) for repost in reposts: txo = self.get_claim_txo(repost) - if txo.name.startswith('@'): + if txo.normalized_name.startswith('@'): channels[repost] = reposter_channel_hash else: streams[repost] = reposter_channel_hash @@ -495,12 +502,12 @@ class LevelDB: for _k, _v in self.db.iterator(prefix=Prefixes.claim_expiration.pack_partial_key(height)): k, v = Prefixes.claim_expiration.unpack_item(_k, _v) tx_hash = self.total_transactions[k.tx_num] - tx = self.coin.transaction(self.db.get(DB_PREFIXES.tx.value + tx_hash)) + tx = self.coin.transaction(self.db.get(Prefixes.tx.pack_key(tx_hash))) # treat it like a claim spend so it will delete/abandon properly # the _spend_claim function this result is fed to expects a txi, so make a mock one # print(f"\texpired lbry://{v.name} {v.claim_hash.hex()}") expired[v.claim_hash] = ( - k.tx_num, k.position, v.name, + k.tx_num, k.position, v.normalized_name, TxInput(prev_hash=tx_hash, prev_idx=k.position, script=tx.outputs[k.position].pk_script, sequence=0) ) return expired @@ -520,14 +527,12 @@ class LevelDB: return txos def get_claim_metadata(self, tx_hash, nout): - raw = self.db.get( - DB_PREFIXES.tx.value + tx_hash - ) + raw = self.db.get(Prefixes.tx.pack_key(tx_hash)) try: output = self.coin.transaction(raw).outputs[nout] script = OutputScript(output.pk_script) script.parse() - return Claim.from_bytes(script.values['claim']), ''.join(chr(c) for c in script.values['claim_name']) + return Claim.from_bytes(script.values['claim']) except: self.logger.error( "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), @@ -546,7 +551,7 @@ class LevelDB: metadata = self.get_claim_metadata(claim.tx_hash, claim.position) if not metadata: return - metadata, non_normalized_name = metadata + metadata = metadata if not metadata.is_stream or not metadata.stream.has_fee: fee_amount = 0 else: @@ -565,7 +570,6 @@ class LevelDB: ) if not reposted_metadata: return - reposted_metadata, _ = reposted_metadata reposted_tags = [] reposted_languages = [] reposted_has_source = False @@ -577,9 +581,7 @@ class LevelDB: reposted_duration = None if reposted_claim: reposted_tx_hash = self.total_transactions[reposted_claim.tx_num] - raw_reposted_claim_tx = self.db.get( - DB_PREFIXES.tx.value + reposted_tx_hash - ) + raw_reposted_claim_tx = self.db.get(Prefixes.tx.pack_key(reposted_tx_hash)) try: reposted_claim_txo = self.coin.transaction( raw_reposted_claim_tx @@ -652,8 +654,8 @@ class LevelDB: reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash) value = { 'claim_id': claim_hash.hex(), - 'claim_name': non_normalized_name, - 'normalized_name': claim.name, + 'name': claim.name, + 'normalized': claim.normalized_name, 'tx_id': claim.tx_hash[::-1].hex(), 'tx_num': claim.tx_num, 'tx_nout': claim.position, @@ -715,7 +717,7 @@ class LevelDB: batch = [] for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): # TODO: fix the couple of claim txos that dont have controlling names - if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)): + if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).normalized_name)): continue claim = self._fs_get_claim_by_hash(claim_hash[1:]) if claim: @@ -740,7 +742,7 @@ class LevelDB: if claim_hash not in self.claim_to_txo: self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue - name = self.claim_to_txo[claim_hash].name + name = self.claim_to_txo[claim_hash].normalized_name if not self.db.get(Prefixes.claim_takeover.pack_key(name)): self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index 25284a432..3c60748be 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -352,19 +352,37 @@ class ResolveCommand(BaseResolveTestCase): one = 'ΣίσυφοςfiÆ' two = 'ΣΊΣΥΦΟσFIæ' - _ = await self.stream_create(one, '0.1') - c = await self.stream_create(two, '0.2') + c1 = await self.stream_create(one, '0.1') + c2 = await self.stream_create(two, '0.2') - winner_id = self.get_claim_id(c) + loser_id = self.get_claim_id(c1) + winner_id = self.get_claim_id(c2) # winning_one = await self.check_lbrycrd_winning(one) await self.assertMatchClaimIsWinning(two, winner_id) - r1 = await self.resolve(f'lbry://{one}') - r2 = await self.resolve(f'lbry://{two}') + claim1 = await self.resolve(f'lbry://{one}') + claim2 = await self.resolve(f'lbry://{two}') + claim3 = await self.resolve(f'lbry://{one}:{winner_id[:5]}') + claim4 = await self.resolve(f'lbry://{two}:{winner_id[:5]}') - self.assertEqual(winner_id, r1['claim_id']) - self.assertEqual(winner_id, r2['claim_id']) + claim5 = await self.resolve(f'lbry://{one}:{loser_id[:5]}') + claim6 = await self.resolve(f'lbry://{two}:{loser_id[:5]}') + + self.assertEqual(winner_id, claim1['claim_id']) + self.assertEqual(winner_id, claim2['claim_id']) + self.assertEqual(winner_id, claim3['claim_id']) + self.assertEqual(winner_id, claim4['claim_id']) + + self.assertEqual(two, claim1['name']) + self.assertEqual(two, claim2['name']) + self.assertEqual(two, claim3['name']) + self.assertEqual(two, claim4['name']) + + self.assertEqual(loser_id, claim5['claim_id']) + self.assertEqual(loser_id, claim6['claim_id']) + self.assertEqual(one, claim5['name']) + self.assertEqual(one, claim6['name']) async def test_resolve_old_claim(self): channel = await self.daemon.jsonrpc_channel_create('@olds', '1.0')