From 6ea96e79bd81a821ce415c6b9d9babf318aefa6b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 2 Jun 2021 11:00:27 -0400 Subject: [PATCH] reposts --- lbry/schema/result.py | 10 +- lbry/wallet/server/block_processor.py | 185 +++++++++++++----- lbry/wallet/server/db/__init__.py | 4 + lbry/wallet/server/db/claimtrie.py | 16 +- lbry/wallet/server/db/common.py | 1 + lbry/wallet/server/db/elasticsearch/search.py | 8 +- lbry/wallet/server/db/prefixes.py | 79 ++++++++ lbry/wallet/server/leveldb.py | 40 ++-- .../blockchain/test_claim_commands.py | 6 +- 9 files changed, 260 insertions(+), 89 deletions(-) diff --git a/lbry/schema/result.py b/lbry/schema/result.py index b2c3b83a5..d9da9911f 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -120,10 +120,10 @@ class Outputs: 'expiration_height': claim.expiration_height, 'effective_amount': claim.effective_amount, 'support_amount': claim.support_amount, - 'trending_group': claim.trending_group, - 'trending_mixed': claim.trending_mixed, - 'trending_local': claim.trending_local, - 'trending_global': claim.trending_global, + # 'trending_group': claim.trending_group, + # 'trending_mixed': claim.trending_mixed, + # 'trending_local': claim.trending_local, + # 'trending_global': claim.trending_global, } if claim.HasField('channel'): txo.channel = tx_map[claim.channel.tx_hash].outputs[claim.channel.nout] @@ -210,7 +210,7 @@ class Outputs: txo_message.nout = resolve_result.position txo_message.height = resolve_result.height txo_message.claim.short_url = resolve_result.short_url - txo_message.claim.reposted = 0 + txo_message.claim.reposted = resolve_result.reposted txo_message.claim.is_controlling = resolve_result.is_controlling txo_message.claim.creation_height = resolve_result.creation_height txo_message.claim.activation_height = resolve_result.activation_height diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index cd730212c..6c9234f8f 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -9,9 +9,10 @@ from prometheus_client import Gauge, Histogram from collections import defaultdict import lbry from lbry.schema.claim import Claim +from lbry.schema.mime_types import guess_stream_type from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger from lbry.wallet.constants import TXO_TYPES -from lbry.wallet.server.db.common import STREAM_TYPES +from lbry.wallet.server.db.common import STREAM_TYPES, CLAIM_TYPES from lbry.wallet.transaction import OutputScript, Output from lbry.wallet.server.tx import Tx, TxOutput, TxInput @@ -213,7 +214,7 @@ class BlockProcessor: # is consistent with self.height self.state_lock = asyncio.Lock() - self.search_cache = {} + # self.search_cache = {} self.history_cache = {} self.status_server = StatusServer() @@ -251,32 +252,98 @@ class BlockProcessor: self.removed_claims_to_send_es = set() self.touched_claims_to_send_es = set() + self.pending_reposted_count = set() + def claim_producer(self): + def get_claim_txo(tx_hash, nout): + raw = self.db.db.get( + DB_PREFIXES.TX_PREFIX.value + tx_hash + ) + try: + output: TxOutput = self.coin.transaction(raw).outputs[nout] + script = OutputScript(output.pk_script) + script.parse() + return Claim.from_bytes(script.values['claim']) + except: + self.logger.exception( + "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), + raw.hex() + ) + return + if self.db.db_height <= 1: return + + to_send_es = set(self.touched_claims_to_send_es) + to_send_es.update(self.pending_reposted_count.difference(self.removed_claims_to_send_es)) + for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() - for claim_hash in self.touched_claims_to_send_es: + for claim_hash in to_send_es: claim = self.db._fs_get_claim_by_hash(claim_hash) - raw_claim_tx = self.db.db.get(DB_PREFIXES.TX_PREFIX.value + claim.tx_hash) - try: - claim_txo: TxOutput = self.coin.transaction(raw_claim_tx).outputs[claim.position] - script = OutputScript(claim_txo.pk_script) - script.parse() - except: - self.logger.exception( - "tx parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex() - ) + metadata = get_claim_txo(claim.tx_hash, claim.position) + if not metadata: continue - try: - metadata = Claim.from_bytes(script.values['claim']) - except: - self.logger.exception( - "claim parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex() + reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1] + reposted_claim = None + reposted_metadata = None + if reposted_claim_hash: + reposted_claim = self.db.get_claim_txo(reposted_claim_hash) + if not reposted_claim: + continue + reposted_metadata = get_claim_txo( + self.db.total_transactions[reposted_claim[0].tx_num], reposted_claim[0].position ) - continue + if not reposted_metadata: + continue + reposted_tags = [] + reposted_languages = [] + reposted_has_source = None + reposted_claim_type = None + if reposted_claim: + reposted_tx_hash = self.db.total_transactions[reposted_claim[0].tx_num] + raw_reposted_claim_tx = self.db.db.get( + DB_PREFIXES.TX_PREFIX.value + reposted_tx_hash + ) + try: + reposted_claim_txo: TxOutput = self.coin.transaction( + raw_reposted_claim_tx + ).outputs[reposted_claim[0].position] + reposted_script = OutputScript(reposted_claim_txo.pk_script) + reposted_script.parse() + except: + self.logger.exception( + "repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), + raw_reposted_claim_tx.hex() + ) + continue + try: + reposted_metadata = Claim.from_bytes(reposted_script.values['claim']) + except: + self.logger.exception( + "reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), + raw_reposted_claim_tx.hex() + ) + continue + if reposted_metadata: + reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags] + reposted_languages = [] if not reposted_metadata.is_stream else ( + [lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none'] + ) + reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source + reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type] + claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags] + claim_languages = [] if not metadata.is_stream else ( + [lang.language or 'none' for lang in metadata.stream.languages] or ['none'] + ) + tags = list(set(claim_tags).union(set(reposted_tags))) + languages = list(set(claim_languages).union(set(reposted_languages))) + canonical_url = f'{claim.name}#{claim.claim_hash.hex()}' + if metadata.is_signed: + channel_txo = self.db.get_claim_txo(metadata.signing_channel_hash[::-1]) + canonical_url = f'{channel_txo[1].name}#{metadata.signing_channel_hash[::-1].hex()}/{canonical_url}' - yield ('update', { + value = { 'claim_hash': claim_hash[::-1], # 'claim_id': claim_hash.hex(), 'claim_name': claim.name, @@ -285,8 +352,8 @@ class BlockProcessor: 'tx_num': claim.tx_num, 'tx_nout': claim.position, 'amount': claim.amount, - 'timestamp': 0, - 'creation_timestamp': 0, + 'timestamp': 0, # TODO: fix + 'creation_timestamp': 0, # TODO: fix 'height': claim.height, 'creation_height': claim.creation_height, 'activation_height': claim.activation_height, @@ -296,25 +363,24 @@ class BlockProcessor: 'is_controlling': claim.is_controlling, 'last_take_over_height': claim.last_takeover_height, - 'short_url': '', - 'canonical_url': '', + 'short_url': f'{claim.name}#{claim.claim_hash.hex()}', # TODO: fix + 'canonical_url': canonical_url, - 'release_time': None if not metadata.is_stream else metadata.stream.release_time, 'title': None if not metadata.is_stream else metadata.stream.title, 'author': None if not metadata.is_stream else metadata.stream.author, 'description': None if not metadata.is_stream else metadata.stream.description, - 'claim_type': TXO_TYPES[metadata.claim_type], + 'claim_type': CLAIM_TYPES[metadata.claim_type], 'has_source': None if not metadata.is_stream else metadata.stream.has_source, - 'stream_type': None if not metadata.is_stream else STREAM_TYPES.get(metadata.stream.stream_type, None), + 'stream_type': None if not metadata.is_stream else STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)], 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type, - 'fee_amount': None if not metadata.is_stream else metadata.stream.fee.amount, + 'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int(max(metadata.stream.fee.amount or 0, 0)*1000), 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency, - 'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration), + # 'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration), - 'reposted': 0, - 'reposted_claim_hash': None, - 'reposted_claim_type': None, - 'reposted_has_source': False, + 'reposted': self.db.get_reposted_count(claim_hash), + 'reposted_claim_hash': reposted_claim_hash, + 'reposted_claim_type': reposted_claim_type, + 'reposted_has_source': reposted_has_source, 'channel_hash': metadata.signing_channel_hash, @@ -323,21 +389,25 @@ class BlockProcessor: self.ledger.public_key_to_address(metadata.channel.public_key_bytes) ), 'signature': metadata.signature, - 'signature_digest': None, - 'signature_valid': False, - 'claims_in_channel': 0, + 'signature_digest': None, # TODO: fix + 'signature_valid': False, # TODO: fix + 'claims_in_channel': 0, # TODO: fix - 'tags': [] if not metadata.is_stream else [tag for tag in metadata.stream.tags], - 'languages': [] if not metadata.is_stream else ( - [lang.language or 'none' for lang in metadata.stream.languages] or ['none'] - ), - 'censor_type': 0, - 'censoring_channel_hash': None, + 'tags': tags, + 'languages': languages, + 'censor_type': 0, # TODO: fix + 'censoring_channel_hash': None, # TODO: fix # 'trending_group': 0, # 'trending_mixed': 0, # 'trending_local': 0, # 'trending_global': 0, - }) + } + if metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration): + value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration + if metadata.is_stream and metadata.stream.release_time: + value['release_time'] = metadata.stream.release_time + + yield ('update', value) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -368,17 +438,20 @@ class BlockProcessor: try: for block in blocks: await self.run_in_thread_with_lock(self.advance_block, block) + # TODO: we shouldnt wait on the search index updating before advancing to the next block await self.db.search_index.claim_consumer(self.claim_producer()) + self.db.search_index.clear_caches() self.touched_claims_to_send_es.clear() self.removed_claims_to_send_es.clear() + self.pending_reposted_count.clear() print("******************\n") except: self.logger.exception("advance blocks failed") raise # if self.sql: - for cache in self.search_cache.values(): - cache.clear() + # for cache in self.search_cache.values(): + # cache.clear() self.history_cache.clear() # TODO: is this needed? self.notifications.notified_mempool_txs.clear() @@ -535,11 +608,16 @@ class BlockProcessor: ops = [] signing_channel_hash = None + reposted_claim_hash = None + if txo.claim.is_repost: + reposted_claim_hash = txo.claim.repost.reference.claim_hash[::-1] + self.pending_reposted_count.add(reposted_claim_hash) + if signable and signable.signing_channel_hash: signing_channel_hash = txo.signable.signing_channel_hash[::-1] - if txo.script.is_claim_name: + if txo.script.is_claim_name: # it's a root claim root_tx_num, root_idx = tx_num, nout - else: + else: # it's a claim update if claim_hash not in spent_claims: print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}") return [] @@ -561,7 +639,7 @@ class BlockProcessor: ) pending = StagedClaimtrieItem( claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num, - root_idx, signing_channel_hash + root_idx, signing_channel_hash, reposted_claim_hash ) self.pending_claims[(tx_num, nout)] = pending self.pending_claim_txos[claim_hash] = (tx_num, nout) @@ -625,11 +703,14 @@ class BlockProcessor: claim_hash = spent_claim_hash_and_name.claim_hash signing_hash = self.db.get_channel_for_claim(claim_hash) k, v = self.db.get_claim_txo(claim_hash) + reposted_claim_hash = self.db.get_repost(claim_hash) spent = StagedClaimtrieItem( v.name, claim_hash, v.amount, self.coin.get_expiration_height(bisect_right(self.db.tx_counts, txin_num)), - txin_num, txin.prev_idx, v.root_tx_num, v.root_position, signing_hash + txin_num, txin.prev_idx, v.root_tx_num, v.root_position, signing_hash, reposted_claim_hash ) + if spent.reposted_claim_hash: + self.pending_reposted_count.add(spent.reposted_claim_hash) spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name) print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") return spent.get_spend_claim_txo_ops() @@ -646,6 +727,7 @@ class BlockProcessor: self.staged_pending_abandoned[pending.claim_hash] = pending claim_root_tx_num, claim_root_idx = pending.root_claim_tx_num, pending.root_claim_tx_position prev_amount, prev_signing_hash = pending.amount, pending.signing_hash + reposted_claim_hash = pending.reposted_claim_hash expiration = self.coin.get_expiration_height(self.height) else: k, v = self.db.get_claim_txo( @@ -653,10 +735,11 @@ class BlockProcessor: ) claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount prev_signing_hash = self.db.get_channel_for_claim(claim_hash) + reposted_claim_hash = self.db.get_repost(claim_hash) expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num)) self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem( name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num, - claim_root_idx, prev_signing_hash + claim_root_idx, prev_signing_hash, reposted_claim_hash ) self.pending_supports[claim_hash].clear() @@ -1216,8 +1299,8 @@ class BlockProcessor: self.possible_future_activated_support.clear() self.possible_future_support_txos.clear() - for cache in self.search_cache.values(): - cache.clear() + # for cache in self.search_cache.values(): + # cache.clear() self.history_cache.clear() self.notifications.notified_mempool_txs.clear() diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index befa3f3a2..5384043d2 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -1,6 +1,7 @@ import enum +@enum.unique class DB_PREFIXES(enum.Enum): claim_to_support = b'K' support_to_claim = b'L' @@ -20,6 +21,9 @@ class DB_PREFIXES(enum.Enum): activated_claim_and_support = b'R' active_amount = b'S' + repost = b'V' + reposted_claim = b'W' + undo_claimtrie = b'M' HISTORY_PREFIX = b'A' diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 4c8aa301c..526721f37 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -3,7 +3,7 @@ from typing import Optional from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow -from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE +from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, RepostPrefixRow, RepostedPrefixRow def length_encoded_name(name: str) -> bytes: @@ -137,6 +137,7 @@ class StagedClaimtrieItem(typing.NamedTuple): root_claim_tx_num: int root_claim_tx_position: int signing_hash: Optional[bytes] + reposted_claim_hash: Optional[bytes] @property def is_update(self) -> bool: @@ -191,6 +192,16 @@ class StagedClaimtrieItem(typing.NamedTuple): ) ) ]) + if self.reposted_claim_hash: + ops.extend([ + op( + *RepostPrefixRow.pack_item(self.claim_hash, self.reposted_claim_hash) + ), + op( + *RepostedPrefixRow.pack_item(self.reposted_claim_hash, self.tx_num, self.position, self.claim_hash) + ), + + ]) return ops def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]: @@ -207,9 +218,8 @@ class StagedClaimtrieItem(typing.NamedTuple): ] + delete_prefix(db, DB_PREFIXES.channel_to_claim.value + self.signing_hash) def get_abandon_ops(self, db) -> typing.List[RevertableOp]: - packed_name = length_encoded_name(self.name) delete_short_id_ops = delete_prefix( - db, DB_PREFIXES.claim_short_id_prefix.value + packed_name + self.claim_hash + db, Prefixes.claim_short_id.pack_partial_key(self.name, self.claim_hash) ) delete_claim_ops = delete_prefix(db, DB_PREFIXES.claim_to_txo.value + self.claim_hash) delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash) diff --git a/lbry/wallet/server/db/common.py b/lbry/wallet/server/db/common.py index 5865c05fc..9f9c9bda3 100644 --- a/lbry/wallet/server/db/common.py +++ b/lbry/wallet/server/db/common.py @@ -438,6 +438,7 @@ class ResolveResult(typing.NamedTuple): expiration_height: int effective_amount: int support_amount: int + reposted: int last_takeover_height: typing.Optional[int] claims_in_channel: typing.Optional[int] channel_hash: typing.Optional[bytes] diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 8e9cb77c2..9d10e378b 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -211,7 +211,8 @@ class SearchIndex: last_takeover_height=r['last_take_over_height'], claims_in_channel=r['claims_in_channel'], channel_hash=r['channel_hash'], - reposted_claim_hash=r['reposted_claim_hash'] + reposted_claim_hash=r['reposted_claim_hash'], + reposted=r['reposted'] ) for r in response ] extra = [ @@ -234,7 +235,8 @@ class SearchIndex: last_takeover_height=r['last_take_over_height'], claims_in_channel=r['claims_in_channel'], channel_hash=r['channel_hash'], - reposted_claim_hash=r['reposted_claim_hash'] + reposted_claim_hash=r['reposted_claim_hash'], + reposted=r['reposted'] ) for r in await self._get_referenced_rows(total_referenced) ] result = Outputs.to_base64( @@ -471,7 +473,7 @@ class SearchIndex: def extract_doc(doc, index): doc['claim_id'] = doc.pop('claim_hash')[::-1].hex() if doc['reposted_claim_hash'] is not None: - doc['reposted_claim_id'] = doc.pop('reposted_claim_hash')[::-1].hex() + doc['reposted_claim_id'] = doc.pop('reposted_claim_hash').hex() else: doc['reposted_claim_id'] = None channel_hash = doc.pop('channel_hash') diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index a766068cb..fb12df2b8 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -193,6 +193,24 @@ class EffectiveAmountValue(typing.NamedTuple): claim_hash: bytes +class RepostKey(typing.NamedTuple): + claim_hash: bytes + + +class RepostValue(typing.NamedTuple): + reposted_claim_hash: bytes + + +class RepostedKey(typing.NamedTuple): + reposted_claim_hash: bytes + tx_num: int + position: int + + +class RepostedValue(typing.NamedTuple): + claim_hash: bytes + + class ActiveAmountPrefixRow(PrefixRow): prefix = DB_PREFIXES.active_amount.value key_struct = struct.Struct(b'>20sBLLH') @@ -676,6 +694,64 @@ class EffectiveAmountPrefixRow(PrefixRow): return cls.pack_key(name, effective_amount, tx_num, position), cls.pack_value(claim_hash) +class RepostPrefixRow(PrefixRow): + prefix = DB_PREFIXES.repost.value + + @classmethod + def pack_key(cls, claim_hash: bytes): + return cls.prefix + claim_hash + + @classmethod + def unpack_key(cls, key: bytes) -> RepostKey: + assert key[0] == cls.prefix + assert len(key) == 21 + return RepostKey[1:] + + @classmethod + def pack_value(cls, reposted_claim_hash: bytes) -> bytes: + return reposted_claim_hash + + @classmethod + def unpack_value(cls, data: bytes) -> RepostValue: + return RepostValue(data) + + @classmethod + def pack_item(cls, claim_hash: bytes, reposted_claim_hash: bytes): + return cls.pack_key(claim_hash), cls.pack_value(reposted_claim_hash) + + +class RepostedPrefixRow(PrefixRow): + prefix = DB_PREFIXES.reposted_claim.value + key_struct = struct.Struct(b'>20sLH') + value_struct = struct.Struct(b'>20s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack, + struct.Struct(b'>20sL').pack, + struct.Struct(b'>20sLH').pack + ] + + @classmethod + def pack_key(cls, reposted_claim_hash: bytes, tx_num: int, position: int): + return super().pack_key(reposted_claim_hash, tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> RepostedKey: + return RepostedKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, claim_hash: bytes) -> bytes: + return super().pack_value(claim_hash) + + @classmethod + def unpack_value(cls, data: bytes) -> RepostedValue: + return RepostedValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, reposted_claim_hash: bytes, tx_num: int, position: int, claim_hash: bytes): + return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash) + + class Prefixes: claim_to_support = ClaimToSupportPrefixRow support_to_claim = SupportToClaimPrefixRow @@ -696,4 +772,7 @@ class Prefixes: effective_amount = EffectiveAmountPrefixRow + repost = RepostPrefixRow + reposted_claim = RepostedPrefixRow + # undo_claimtrie = b'M' diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 83dd752ab..d38c93bbb 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -40,7 +40,7 @@ from lbry.wallet.server.db.common import ResolveResult from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue -from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, length_encoded_name +from lbry.wallet.server.db.claimtrie import length_encoded_name from lbry.wallet.server.db.elasticsearch import SearchIndex @@ -58,8 +58,6 @@ TXO_STRUCT_unpack = TXO_STRUCT.unpack TXO_STRUCT_pack = TXO_STRUCT.pack - - @attr.s(slots=True) class FlushData: height = attr.ib() @@ -158,6 +156,18 @@ class LevelDB: return return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name) + def get_repost(self, claim_hash) -> Optional[bytes]: + repost = self.db.get(Prefixes.repost.pack_key(claim_hash)) + if repost: + return Prefixes.repost.unpack_value(repost).reposted_claim_hash + return + + def get_reposted_count(self, claim_hash: bytes) -> int: + cnt = 0 + for _ in self.db.iterator(prefix=Prefixes.reposted_claim.pack_partial_key(claim_hash)): + cnt += 1 + return cnt + def get_activation(self, tx_num, position, is_support=False) -> int: activation = self.db.get( Prefixes.activated.pack_key( @@ -208,6 +218,7 @@ class LevelDB: effective_amount = support_amount + claim_amount channel_hash = self.get_channel_for_claim(claim_hash) + reposted_claim_hash = self.get_repost(claim_hash) claims_in_channel = None short_url = f'{name}#{claim_hash.hex()}' @@ -224,7 +235,8 @@ class LevelDB: last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel, creation_height=created_height, activation_height=activation_height, expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount, - channel_hash=channel_hash, reposted_claim_hash=None + channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash, + reposted=self.get_reposted_count(claim_hash) ) def _resolve(self, normalized_name: str, claim_id: Optional[str] = None, @@ -339,26 +351,6 @@ class LevelDB: self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id) ) - def make_staged_claim_item(self, claim_hash: bytes) -> Optional[StagedClaimtrieItem]: - claim_info = self.get_claim_txo(claim_hash) - k, v = claim_info - root_tx_num = v.root_tx_num - root_idx = v.root_position - value = v.amount - name = v.name - tx_num = k.tx_num - idx = k.position - height = bisect_right(self.tx_counts, tx_num) - signing_hash = self.get_channel_for_claim(claim_hash) - # if signing_hash: - # count = self.get_claims_in_channel_count(signing_hash) - # else: - # count = 0 - return StagedClaimtrieItem( - name, claim_hash, value, self.coin.get_expiration_height(height), tx_num, idx, - root_tx_num, root_idx, signing_hash - ) - def get_claim_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]: v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash, tx_num, position)) if v: diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 2836f7671..8cee0b4f1 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -1484,9 +1484,9 @@ class StreamCommands(ClaimTestCase): filtering_channel_id = self.get_claim_id( await self.channel_create('@filtering', '0.1') ) - self.conductor.spv_node.server.db.sql.filtering_channel_hashes.add( - unhexlify(filtering_channel_id)[::-1] - ) + # self.conductor.spv_node.server.db.sql.filtering_channel_hashes.add( + # unhexlify(filtering_channel_id)[::-1] + # ) self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.filtered_streams)) await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering') self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.filtered_streams))