fix updating the ES search index

-update search index to use ResolveResult tuples
This commit is contained in:
Jack Robison 2021-05-28 14:10:35 -04:00
parent 6f5bca0f67
commit eb1ba143ec
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 137 additions and 54 deletions

View file

@ -9,6 +9,10 @@ from prometheus_client import Gauge, Histogram
from collections import defaultdict from collections import defaultdict
import lbry import lbry
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
from lbry.wallet.constants import TXO_TYPES
from lbry.wallet.server.db.common import STREAM_TYPES
from lbry.wallet.transaction import OutputScript, Output from lbry.wallet.transaction import OutputScript, Output
from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.tx import Tx, TxOutput, TxInput
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
@ -174,6 +178,13 @@ class BlockProcessor:
self.notifications = notifications self.notifications = notifications
self.coin = env.coin self.coin = env.coin
if env.coin.NET == 'mainnet':
self.ledger = Ledger
elif env.coin.NET == 'testnet':
self.ledger = TestNetLedger
else:
self.ledger = RegTestLedger
self.blocks_event = asyncio.Event() self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
@ -247,12 +258,31 @@ class BlockProcessor:
yield 'delete', claim_hash.hex() yield 'delete', claim_hash.hex()
for claim_hash in self.touched_claims_to_send_es: for claim_hash in self.touched_claims_to_send_es:
claim = self.db._fs_get_claim_by_hash(claim_hash) claim = self.db._fs_get_claim_by_hash(claim_hash)
raw_claim_tx = self.db.db.get(DB_PREFIXES.TX_PREFIX.value + claim.tx_hash)
try:
claim_txo: TxOutput = self.coin.transaction(raw_claim_tx).outputs[claim.position]
script = OutputScript(claim_txo.pk_script)
script.parse()
except:
self.logger.exception(
"tx parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex()
)
continue
try:
metadata = Claim.from_bytes(script.values['claim'])
except:
self.logger.exception(
"claim parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex()
)
continue
yield ('update', { yield ('update', {
'claim_hash': claim_hash, 'claim_hash': claim_hash[::-1],
# 'claim_id': claim_hash.hex(), # 'claim_id': claim_hash.hex(),
'claim_name': claim.name, 'claim_name': claim.name,
'normalized': claim.name, 'normalized': claim.name,
'tx_id': claim.tx_hash[::-1].hex(), 'tx_id': claim.tx_hash[::-1].hex(),
'tx_num': claim.tx_num,
'tx_nout': claim.position, 'tx_nout': claim.position,
'amount': claim.amount, 'amount': claim.amount,
'timestamp': 0, 'timestamp': 0,
@ -269,35 +299,38 @@ class BlockProcessor:
'short_url': '', 'short_url': '',
'canonical_url': '', 'canonical_url': '',
'release_time': 0, 'release_time': None if not metadata.is_stream else metadata.stream.release_time,
'title': '', 'title': None if not metadata.is_stream else metadata.stream.title,
'author': '', 'author': None if not metadata.is_stream else metadata.stream.author,
'description': '', 'description': None if not metadata.is_stream else metadata.stream.description,
'claim_type': 0, 'claim_type': TXO_TYPES[metadata.claim_type],
'has_source': False, 'has_source': None if not metadata.is_stream else metadata.stream.has_source,
'stream_type': '', 'stream_type': None if not metadata.is_stream else STREAM_TYPES.get(metadata.stream.stream_type, None),
'media_type': '', 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
'fee_amount': 0, 'fee_amount': None if not metadata.is_stream else metadata.stream.fee.amount,
'fee_currency': '', 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
'duration': 0, 'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration),
'reposted': 0, 'reposted': 0,
'reposted_claim_hash': None, 'reposted_claim_hash': None,
'reposted_claim_type': None, 'reposted_claim_type': None,
'reposted_has_source': False, 'reposted_has_source': False,
'channel_hash': None, 'channel_hash': metadata.signing_channel_hash,
'public_key_bytes': None, 'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes,
'public_key_hash': None, 'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160(
'signature': None, self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
),
'signature': metadata.signature,
'signature_digest': None, 'signature_digest': None,
'signature_valid': False, 'signature_valid': False,
'claims_in_channel': 0, 'claims_in_channel': 0,
'tags': [], 'tags': [] if not metadata.is_stream else [tag for tag in metadata.stream.tags],
'languages': [], 'languages': [] if not metadata.is_stream else (
[lang.language or 'none' for lang in metadata.stream.languages] or ['none']
),
'censor_type': 0, 'censor_type': 0,
'censoring_channel_hash': None, 'censoring_channel_hash': None,
# 'trending_group': 0, # 'trending_group': 0,
@ -885,10 +918,10 @@ class BlockProcessor:
for txo in activated: for txo in activated:
v = txo[1], PendingActivationValue(claim_hash, name), txo[0] v = txo[1], PendingActivationValue(claim_hash, name), txo[0]
future_activations[name][claim_hash] = v future_activations[name][claim_hash] = v
if v[2].is_claim: if txo[0].is_claim:
self.possible_future_activated_claim[(name, claim_hash)] = v[0] self.possible_future_activated_claim[(name, claim_hash)] = txo[1]
else: else:
self.possible_future_activated_support[claim_hash].append(v[0]) self.possible_future_activated_support[claim_hash].append(txo[1])
# process takeovers # process takeovers
checked_names = set() checked_names = set()
@ -927,7 +960,6 @@ class BlockProcessor:
position = claim[0].position position = claim[0].position
amount = claim[1].amount amount = claim[1].amount
activation = self.db.get_activation(tx_num, position) activation = self.db.get_activation(tx_num, position)
else: else:
tx_num, position = self.pending_claim_txos[winning_including_future_activations] tx_num, position = self.pending_claim_txos[winning_including_future_activations]
amount = None amount = None
@ -1024,8 +1056,9 @@ class BlockProcessor:
# gather cumulative removed/touched sets to update the search index # gather cumulative removed/touched sets to update the search index
self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys())) self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys()))
self.touched_claims_to_send_es.update( self.touched_claims_to_send_es.update(
set(self.staged_activated_support.keys()).union(set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())).difference( set(self.staged_activated_support.keys()).union(
self.removed_claims_to_send_es) set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())
).difference(self.removed_claims_to_send_es)
) )
# for use the cumulative changes to now update bid ordered resolve # for use the cumulative changes to now update bid ordered resolve

View file

@ -1,3 +1,5 @@
import typing
CLAIM_TYPES = { CLAIM_TYPES = {
'stream': 1, 'stream': 1,
'channel': 2, 'channel': 2,
@ -418,3 +420,25 @@ INDEXED_LANGUAGES = [
'zh', 'zh',
'zu' 'zu'
] ]
class ResolveResult(typing.NamedTuple):
name: str
claim_hash: bytes
tx_num: int
position: int
tx_hash: bytes
height: int
amount: int
short_url: str
is_controlling: bool
canonical_url: str
creation_height: int
activation_height: int
expiration_height: int
effective_amount: int
support_amount: int
last_takeover_height: typing.Optional[int]
claims_in_channel: typing.Optional[int]
channel_hash: typing.Optional[bytes]
reposted_claim_hash: typing.Optional[bytes]

View file

@ -53,7 +53,7 @@ FIELDS = {
'duration', 'release_time', 'duration', 'release_time',
'tags', 'languages', 'has_source', 'reposted_claim_type', 'tags', 'languages', 'has_source', 'reposted_claim_type',
'reposted_claim_id', 'repost_count', 'reposted_claim_id', 'repost_count',
'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'tx_num'
} }
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', 'censoring_channel_id', TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', 'censoring_channel_id',
@ -66,7 +66,7 @@ RANGE_FIELDS = {
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel', 'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount', 'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'censor_type', 'trending_group', 'trending_mixed', 'censor_type',
'trending_local', 'trending_global', 'trending_local', 'trending_global', 'tx_num'
} }
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS

View file

@ -19,6 +19,7 @@ from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES
from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \
RANGE_FIELDS, ALL_FIELDS RANGE_FIELDS, ALL_FIELDS
from lbry.wallet.server.util import class_logger from lbry.wallet.server.util import class_logger
from lbry.wallet.server.db.common import ResolveResult
class ChannelResolution(str): class ChannelResolution(str):
@ -185,11 +186,59 @@ class SearchIndex:
response, offset, total = await self.search(**kwargs) response, offset, total = await self.search(**kwargs)
censor.apply(response) censor.apply(response)
total_referenced.extend(response) total_referenced.extend(response)
if censor.censored: if censor.censored:
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
total_referenced.extend(response) total_referenced.extend(response)
response = [
ResolveResult(
name=r['claim_name'],
claim_hash=r['claim_hash'],
tx_num=r['tx_num'],
position=r['tx_nout'],
tx_hash=r['tx_hash'],
height=r['height'],
amount=r['amount'],
short_url=r['short_url'],
is_controlling=r['is_controlling'],
canonical_url=r['canonical_url'],
creation_height=r['creation_height'],
activation_height=r['activation_height'],
expiration_height=r['expiration_height'],
effective_amount=r['effective_amount'],
support_amount=r['support_amount'],
last_takeover_height=r['last_take_over_height'],
claims_in_channel=r['claims_in_channel'],
channel_hash=r['channel_hash'],
reposted_claim_hash=r['reposted_claim_hash']
) for r in response
]
extra = [
ResolveResult(
name=r['claim_name'],
claim_hash=r['claim_hash'],
tx_num=r['tx_num'],
position=r['tx_nout'],
tx_hash=r['tx_hash'],
height=r['height'],
amount=r['amount'],
short_url=r['short_url'],
is_controlling=r['is_controlling'],
canonical_url=r['canonical_url'],
creation_height=r['creation_height'],
activation_height=r['activation_height'],
expiration_height=r['expiration_height'],
effective_amount=r['effective_amount'],
support_amount=r['support_amount'],
last_takeover_height=r['last_take_over_height'],
claims_in_channel=r['claims_in_channel'],
channel_hash=r['channel_hash'],
reposted_claim_hash=r['reposted_claim_hash']
) for r in await self._get_referenced_rows(total_referenced)
]
result = Outputs.to_base64( result = Outputs.to_base64(
response, await self._get_referenced_rows(total_referenced), offset, total, censor response, extra, offset, total, censor
) )
cache_item.result = result cache_item.result = result
return result return result

View file

@ -36,6 +36,7 @@ from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_ui
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.common import ResolveResult
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
@ -75,28 +76,6 @@ class FlushData:
undo = attr.ib() undo = attr.ib()
class ResolveResult(typing.NamedTuple):
name: str
claim_hash: bytes
tx_num: int
position: int
tx_hash: bytes
height: int
amount: int
short_url: str
is_controlling: bool
canonical_url: str
creation_height: int
activation_height: int
expiration_height: int
effective_amount: int
support_amount: int
last_takeover_height: Optional[int]
claims_in_channel: Optional[int]
channel_hash: Optional[bytes]
reposted_claim_hash: Optional[bytes]
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]] OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll') DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll')
@ -259,9 +238,9 @@ class LevelDB:
# winning resolution # winning resolution
controlling = self.get_controlling_claim(normalized_name) controlling = self.get_controlling_claim(normalized_name)
if not controlling: if not controlling:
print("none controlling") print(f"none controlling for lbry://{normalized_name}")
return return
print("resolved controlling", controlling.claim_hash.hex()) print(f"resolved controlling lbry://{normalized_name}#{controlling.claim_hash.hex()}")
return self._fs_get_claim_by_hash(controlling.claim_hash) return self._fs_get_claim_by_hash(controlling.claim_hash)
amount_order = max(int(amount_order or 1), 1) amount_order = max(int(amount_order or 1), 1)

View file

@ -1035,7 +1035,6 @@ class LBRYElectrumX(SessionBase):
async def claimtrie_resolve(self, *urls): async def claimtrie_resolve(self, *urls):
rows, extra = [], [] rows, extra = [], []
for url in urls: for url in urls:
print("resolve", url)
self.session_mgr.urls_to_resolve_count_metric.inc() self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel = await self.db.fs_resolve(url) stream, channel = await self.db.fs_resolve(url)
self.session_mgr.resolved_url_count_metric.inc() self.session_mgr.resolved_url_count_metric.inc()
@ -1071,7 +1070,6 @@ class LBRYElectrumX(SessionBase):
if not stream: if not stream:
stream = LookupError(f"Could not find claim at {claim_id}") stream = LookupError(f"Could not find claim at {claim_id}")
rows.append(stream) rows.append(stream)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None) return Outputs.to_base64(rows, extra, 0, None, None)
def assert_tx_hash(self, value): def assert_tx_hash(self, value):