From 9ad31008a5555da14bdf523be7e3e9fb9e33d72a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 28 May 2021 14:10:35 -0400 Subject: [PATCH] fix updating the ES search index -update search index to use ResolveResult tuples --- lbry/wallet/server/block_processor.py | 83 +++++++++++++------ lbry/wallet/server/db/common.py | 24 ++++++ .../server/db/elasticsearch/constants.py | 4 +- lbry/wallet/server/db/elasticsearch/search.py | 51 +++++++++++- lbry/wallet/server/leveldb.py | 27 +----- lbry/wallet/server/session.py | 2 - 6 files changed, 137 insertions(+), 54 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 3236b1152..cd730212c 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -9,6 +9,10 @@ from prometheus_client import Gauge, Histogram from collections import defaultdict import lbry from lbry.schema.claim import Claim +from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger +from lbry.wallet.constants import TXO_TYPES +from lbry.wallet.server.db.common import STREAM_TYPES + from lbry.wallet.transaction import OutputScript, Output from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.daemon import DaemonError @@ -174,6 +178,13 @@ class BlockProcessor: self.notifications = notifications self.coin = env.coin + if env.coin.NET == 'mainnet': + self.ledger = Ledger + elif env.coin.NET == 'testnet': + self.ledger = TestNetLedger + else: + self.ledger = RegTestLedger + self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.logger = class_logger(__name__, self.__class__.__name__) @@ -247,12 +258,31 @@ class BlockProcessor: yield 'delete', claim_hash.hex() for claim_hash in self.touched_claims_to_send_es: claim = self.db._fs_get_claim_by_hash(claim_hash) + raw_claim_tx = self.db.db.get(DB_PREFIXES.TX_PREFIX.value + claim.tx_hash) + try: + claim_txo: TxOutput = self.coin.transaction(raw_claim_tx).outputs[claim.position] + script = OutputScript(claim_txo.pk_script) + script.parse() + except: + self.logger.exception( + "tx parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex() + ) + continue + try: + metadata = Claim.from_bytes(script.values['claim']) + except: + self.logger.exception( + "claim parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex() + ) + continue + yield ('update', { - 'claim_hash': claim_hash, + 'claim_hash': claim_hash[::-1], # 'claim_id': claim_hash.hex(), 'claim_name': claim.name, 'normalized': claim.name, 'tx_id': claim.tx_hash[::-1].hex(), + 'tx_num': claim.tx_num, 'tx_nout': claim.position, 'amount': claim.amount, 'timestamp': 0, @@ -269,35 +299,38 @@ class BlockProcessor: 'short_url': '', 'canonical_url': '', - 'release_time': 0, - 'title': '', - 'author': '', - 'description': '', - 'claim_type': 0, - 'has_source': False, - 'stream_type': '', - 'media_type': '', - 'fee_amount': 0, - 'fee_currency': '', - 'duration': 0, + 'release_time': None if not metadata.is_stream else metadata.stream.release_time, + 'title': None if not metadata.is_stream else metadata.stream.title, + 'author': None if not metadata.is_stream else metadata.stream.author, + 'description': None if not metadata.is_stream else metadata.stream.description, + 'claim_type': TXO_TYPES[metadata.claim_type], + 'has_source': None if not metadata.is_stream else metadata.stream.has_source, + 'stream_type': None if not metadata.is_stream else STREAM_TYPES.get(metadata.stream.stream_type, None), + 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type, + 'fee_amount': None if not metadata.is_stream else metadata.stream.fee.amount, + 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency, + 'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration), 'reposted': 0, 'reposted_claim_hash': None, 'reposted_claim_type': None, 'reposted_has_source': False, - 'channel_hash': None, + 'channel_hash': metadata.signing_channel_hash, - 'public_key_bytes': None, - 'public_key_hash': None, - 'signature': None, + 'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes, + 'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160( + self.ledger.public_key_to_address(metadata.channel.public_key_bytes) + ), + 'signature': metadata.signature, 'signature_digest': None, 'signature_valid': False, 'claims_in_channel': 0, - 'tags': [], - 'languages': [], - + 'tags': [] if not metadata.is_stream else [tag for tag in metadata.stream.tags], + 'languages': [] if not metadata.is_stream else ( + [lang.language or 'none' for lang in metadata.stream.languages] or ['none'] + ), 'censor_type': 0, 'censoring_channel_hash': None, # 'trending_group': 0, @@ -885,10 +918,10 @@ class BlockProcessor: for txo in activated: v = txo[1], PendingActivationValue(claim_hash, name), txo[0] future_activations[name][claim_hash] = v - if v[2].is_claim: - self.possible_future_activated_claim[(name, claim_hash)] = v[0] + if txo[0].is_claim: + self.possible_future_activated_claim[(name, claim_hash)] = txo[1] else: - self.possible_future_activated_support[claim_hash].append(v[0]) + self.possible_future_activated_support[claim_hash].append(txo[1]) # process takeovers checked_names = set() @@ -927,7 +960,6 @@ class BlockProcessor: position = claim[0].position amount = claim[1].amount activation = self.db.get_activation(tx_num, position) - else: tx_num, position = self.pending_claim_txos[winning_including_future_activations] amount = None @@ -1024,8 +1056,9 @@ class BlockProcessor: # gather cumulative removed/touched sets to update the search index self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys())) self.touched_claims_to_send_es.update( - set(self.staged_activated_support.keys()).union(set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())).difference( - self.removed_claims_to_send_es) + set(self.staged_activated_support.keys()).union( + set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys()) + ).difference(self.removed_claims_to_send_es) ) # for use the cumulative changes to now update bid ordered resolve diff --git a/lbry/wallet/server/db/common.py b/lbry/wallet/server/db/common.py index c0fdc4f3f..5865c05fc 100644 --- a/lbry/wallet/server/db/common.py +++ b/lbry/wallet/server/db/common.py @@ -1,3 +1,5 @@ +import typing + CLAIM_TYPES = { 'stream': 1, 'channel': 2, @@ -418,3 +420,25 @@ INDEXED_LANGUAGES = [ 'zh', 'zu' ] + + +class ResolveResult(typing.NamedTuple): + name: str + claim_hash: bytes + tx_num: int + position: int + tx_hash: bytes + height: int + amount: int + short_url: str + is_controlling: bool + canonical_url: str + creation_height: int + activation_height: int + expiration_height: int + effective_amount: int + support_amount: int + last_takeover_height: typing.Optional[int] + claims_in_channel: typing.Optional[int] + channel_hash: typing.Optional[bytes] + reposted_claim_hash: typing.Optional[bytes] diff --git a/lbry/wallet/server/db/elasticsearch/constants.py b/lbry/wallet/server/db/elasticsearch/constants.py index 35f1b054d..f20cf822f 100644 --- a/lbry/wallet/server/db/elasticsearch/constants.py +++ b/lbry/wallet/server/db/elasticsearch/constants.py @@ -53,7 +53,7 @@ FIELDS = { 'duration', 'release_time', 'tags', 'languages', 'has_source', 'reposted_claim_type', 'reposted_claim_id', 'repost_count', - 'trending_group', 'trending_mixed', 'trending_local', 'trending_global', + 'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'tx_num' } TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', 'censoring_channel_id', @@ -66,7 +66,7 @@ RANGE_FIELDS = { 'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel', 'amount', 'effective_amount', 'support_amount', 'trending_group', 'trending_mixed', 'censor_type', - 'trending_local', 'trending_global', + 'trending_local', 'trending_global', 'tx_num' } ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 99ca99887..8e9cb77c2 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -19,6 +19,7 @@ from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ RANGE_FIELDS, ALL_FIELDS from lbry.wallet.server.util import class_logger +from lbry.wallet.server.db.common import ResolveResult class ChannelResolution(str): @@ -185,11 +186,59 @@ class SearchIndex: response, offset, total = await self.search(**kwargs) censor.apply(response) total_referenced.extend(response) + if censor.censored: response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) total_referenced.extend(response) + + response = [ + ResolveResult( + name=r['claim_name'], + claim_hash=r['claim_hash'], + tx_num=r['tx_num'], + position=r['tx_nout'], + tx_hash=r['tx_hash'], + height=r['height'], + amount=r['amount'], + short_url=r['short_url'], + is_controlling=r['is_controlling'], + canonical_url=r['canonical_url'], + creation_height=r['creation_height'], + activation_height=r['activation_height'], + expiration_height=r['expiration_height'], + effective_amount=r['effective_amount'], + support_amount=r['support_amount'], + last_takeover_height=r['last_take_over_height'], + claims_in_channel=r['claims_in_channel'], + channel_hash=r['channel_hash'], + reposted_claim_hash=r['reposted_claim_hash'] + ) for r in response + ] + extra = [ + ResolveResult( + name=r['claim_name'], + claim_hash=r['claim_hash'], + tx_num=r['tx_num'], + position=r['tx_nout'], + tx_hash=r['tx_hash'], + height=r['height'], + amount=r['amount'], + short_url=r['short_url'], + is_controlling=r['is_controlling'], + canonical_url=r['canonical_url'], + creation_height=r['creation_height'], + activation_height=r['activation_height'], + expiration_height=r['expiration_height'], + effective_amount=r['effective_amount'], + support_amount=r['support_amount'], + last_takeover_height=r['last_take_over_height'], + claims_in_channel=r['claims_in_channel'], + channel_hash=r['channel_hash'], + reposted_claim_hash=r['reposted_claim_hash'] + ) for r in await self._get_referenced_rows(total_referenced) + ] result = Outputs.to_base64( - response, await self._get_referenced_rows(total_referenced), offset, total, censor + response, extra, offset, total, censor ) cache_item.result = result return result diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 2c0740ada..83dd752ab 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -36,6 +36,7 @@ from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_ui from lbry.wallet.server.storage import db_class from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db.common import ResolveResult from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue @@ -75,28 +76,6 @@ class FlushData: undo = attr.ib() -class ResolveResult(typing.NamedTuple): - name: str - claim_hash: bytes - tx_num: int - position: int - tx_hash: bytes - height: int - amount: int - short_url: str - is_controlling: bool - canonical_url: str - creation_height: int - activation_height: int - expiration_height: int - effective_amount: int - support_amount: int - last_takeover_height: Optional[int] - claims_in_channel: Optional[int] - channel_hash: Optional[bytes] - reposted_claim_hash: Optional[bytes] - - OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]] DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll') @@ -259,9 +238,9 @@ class LevelDB: # winning resolution controlling = self.get_controlling_claim(normalized_name) if not controlling: - print("none controlling") + print(f"none controlling for lbry://{normalized_name}") return - print("resolved controlling", controlling.claim_hash.hex()) + print(f"resolved controlling lbry://{normalized_name}#{controlling.claim_hash.hex()}") return self._fs_get_claim_by_hash(controlling.claim_hash) amount_order = max(int(amount_order or 1), 1) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index d5b969bf0..b40173933 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1035,7 +1035,6 @@ class LBRYElectrumX(SessionBase): async def claimtrie_resolve(self, *urls): rows, extra = [], [] for url in urls: - print("resolve", url) self.session_mgr.urls_to_resolve_count_metric.inc() stream, channel = await self.db.fs_resolve(url) self.session_mgr.resolved_url_count_metric.inc() @@ -1071,7 +1070,6 @@ class LBRYElectrumX(SessionBase): if not stream: stream = LookupError(f"Could not find claim at {claim_id}") rows.append(stream) - # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) return Outputs.to_base64(rows, extra, 0, None, None) def assert_tx_hash(self, value):