forked from LBRYCommunity/lbry-sdk
fix updating the ES search index
-update search index to use ResolveResult tuples
This commit is contained in:
parent
966f47a5b1
commit
9ad31008a5
6 changed files with 137 additions and 54 deletions
|
@ -9,6 +9,10 @@ from prometheus_client import Gauge, Histogram
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import lbry
|
import lbry
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
|
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
||||||
|
from lbry.wallet.constants import TXO_TYPES
|
||||||
|
from lbry.wallet.server.db.common import STREAM_TYPES
|
||||||
|
|
||||||
from lbry.wallet.transaction import OutputScript, Output
|
from lbry.wallet.transaction import OutputScript, Output
|
||||||
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
||||||
from lbry.wallet.server.daemon import DaemonError
|
from lbry.wallet.server.daemon import DaemonError
|
||||||
|
@ -174,6 +178,13 @@ class BlockProcessor:
|
||||||
self.notifications = notifications
|
self.notifications = notifications
|
||||||
|
|
||||||
self.coin = env.coin
|
self.coin = env.coin
|
||||||
|
if env.coin.NET == 'mainnet':
|
||||||
|
self.ledger = Ledger
|
||||||
|
elif env.coin.NET == 'testnet':
|
||||||
|
self.ledger = TestNetLedger
|
||||||
|
else:
|
||||||
|
self.ledger = RegTestLedger
|
||||||
|
|
||||||
self.blocks_event = asyncio.Event()
|
self.blocks_event = asyncio.Event()
|
||||||
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
|
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
|
@ -247,12 +258,31 @@ class BlockProcessor:
|
||||||
yield 'delete', claim_hash.hex()
|
yield 'delete', claim_hash.hex()
|
||||||
for claim_hash in self.touched_claims_to_send_es:
|
for claim_hash in self.touched_claims_to_send_es:
|
||||||
claim = self.db._fs_get_claim_by_hash(claim_hash)
|
claim = self.db._fs_get_claim_by_hash(claim_hash)
|
||||||
|
raw_claim_tx = self.db.db.get(DB_PREFIXES.TX_PREFIX.value + claim.tx_hash)
|
||||||
|
try:
|
||||||
|
claim_txo: TxOutput = self.coin.transaction(raw_claim_tx).outputs[claim.position]
|
||||||
|
script = OutputScript(claim_txo.pk_script)
|
||||||
|
script.parse()
|
||||||
|
except:
|
||||||
|
self.logger.exception(
|
||||||
|
"tx parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex()
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
metadata = Claim.from_bytes(script.values['claim'])
|
||||||
|
except:
|
||||||
|
self.logger.exception(
|
||||||
|
"claim parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex()
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
yield ('update', {
|
yield ('update', {
|
||||||
'claim_hash': claim_hash,
|
'claim_hash': claim_hash[::-1],
|
||||||
# 'claim_id': claim_hash.hex(),
|
# 'claim_id': claim_hash.hex(),
|
||||||
'claim_name': claim.name,
|
'claim_name': claim.name,
|
||||||
'normalized': claim.name,
|
'normalized': claim.name,
|
||||||
'tx_id': claim.tx_hash[::-1].hex(),
|
'tx_id': claim.tx_hash[::-1].hex(),
|
||||||
|
'tx_num': claim.tx_num,
|
||||||
'tx_nout': claim.position,
|
'tx_nout': claim.position,
|
||||||
'amount': claim.amount,
|
'amount': claim.amount,
|
||||||
'timestamp': 0,
|
'timestamp': 0,
|
||||||
|
@ -269,35 +299,38 @@ class BlockProcessor:
|
||||||
'short_url': '',
|
'short_url': '',
|
||||||
'canonical_url': '',
|
'canonical_url': '',
|
||||||
|
|
||||||
'release_time': 0,
|
'release_time': None if not metadata.is_stream else metadata.stream.release_time,
|
||||||
'title': '',
|
'title': None if not metadata.is_stream else metadata.stream.title,
|
||||||
'author': '',
|
'author': None if not metadata.is_stream else metadata.stream.author,
|
||||||
'description': '',
|
'description': None if not metadata.is_stream else metadata.stream.description,
|
||||||
'claim_type': 0,
|
'claim_type': TXO_TYPES[metadata.claim_type],
|
||||||
'has_source': False,
|
'has_source': None if not metadata.is_stream else metadata.stream.has_source,
|
||||||
'stream_type': '',
|
'stream_type': None if not metadata.is_stream else STREAM_TYPES.get(metadata.stream.stream_type, None),
|
||||||
'media_type': '',
|
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
|
||||||
'fee_amount': 0,
|
'fee_amount': None if not metadata.is_stream else metadata.stream.fee.amount,
|
||||||
'fee_currency': '',
|
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
|
||||||
'duration': 0,
|
'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration),
|
||||||
|
|
||||||
'reposted': 0,
|
'reposted': 0,
|
||||||
'reposted_claim_hash': None,
|
'reposted_claim_hash': None,
|
||||||
'reposted_claim_type': None,
|
'reposted_claim_type': None,
|
||||||
'reposted_has_source': False,
|
'reposted_has_source': False,
|
||||||
|
|
||||||
'channel_hash': None,
|
'channel_hash': metadata.signing_channel_hash,
|
||||||
|
|
||||||
'public_key_bytes': None,
|
'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes,
|
||||||
'public_key_hash': None,
|
'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160(
|
||||||
'signature': None,
|
self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
|
||||||
|
),
|
||||||
|
'signature': metadata.signature,
|
||||||
'signature_digest': None,
|
'signature_digest': None,
|
||||||
'signature_valid': False,
|
'signature_valid': False,
|
||||||
'claims_in_channel': 0,
|
'claims_in_channel': 0,
|
||||||
|
|
||||||
'tags': [],
|
'tags': [] if not metadata.is_stream else [tag for tag in metadata.stream.tags],
|
||||||
'languages': [],
|
'languages': [] if not metadata.is_stream else (
|
||||||
|
[lang.language or 'none' for lang in metadata.stream.languages] or ['none']
|
||||||
|
),
|
||||||
'censor_type': 0,
|
'censor_type': 0,
|
||||||
'censoring_channel_hash': None,
|
'censoring_channel_hash': None,
|
||||||
# 'trending_group': 0,
|
# 'trending_group': 0,
|
||||||
|
@ -885,10 +918,10 @@ class BlockProcessor:
|
||||||
for txo in activated:
|
for txo in activated:
|
||||||
v = txo[1], PendingActivationValue(claim_hash, name), txo[0]
|
v = txo[1], PendingActivationValue(claim_hash, name), txo[0]
|
||||||
future_activations[name][claim_hash] = v
|
future_activations[name][claim_hash] = v
|
||||||
if v[2].is_claim:
|
if txo[0].is_claim:
|
||||||
self.possible_future_activated_claim[(name, claim_hash)] = v[0]
|
self.possible_future_activated_claim[(name, claim_hash)] = txo[1]
|
||||||
else:
|
else:
|
||||||
self.possible_future_activated_support[claim_hash].append(v[0])
|
self.possible_future_activated_support[claim_hash].append(txo[1])
|
||||||
|
|
||||||
# process takeovers
|
# process takeovers
|
||||||
checked_names = set()
|
checked_names = set()
|
||||||
|
@ -927,7 +960,6 @@ class BlockProcessor:
|
||||||
position = claim[0].position
|
position = claim[0].position
|
||||||
amount = claim[1].amount
|
amount = claim[1].amount
|
||||||
activation = self.db.get_activation(tx_num, position)
|
activation = self.db.get_activation(tx_num, position)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
tx_num, position = self.pending_claim_txos[winning_including_future_activations]
|
tx_num, position = self.pending_claim_txos[winning_including_future_activations]
|
||||||
amount = None
|
amount = None
|
||||||
|
@ -1024,8 +1056,9 @@ class BlockProcessor:
|
||||||
# gather cumulative removed/touched sets to update the search index
|
# gather cumulative removed/touched sets to update the search index
|
||||||
self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys()))
|
self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys()))
|
||||||
self.touched_claims_to_send_es.update(
|
self.touched_claims_to_send_es.update(
|
||||||
set(self.staged_activated_support.keys()).union(set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())).difference(
|
set(self.staged_activated_support.keys()).union(
|
||||||
self.removed_claims_to_send_es)
|
set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())
|
||||||
|
).difference(self.removed_claims_to_send_es)
|
||||||
)
|
)
|
||||||
|
|
||||||
# for use the cumulative changes to now update bid ordered resolve
|
# for use the cumulative changes to now update bid ordered resolve
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
import typing
|
||||||
|
|
||||||
CLAIM_TYPES = {
|
CLAIM_TYPES = {
|
||||||
'stream': 1,
|
'stream': 1,
|
||||||
'channel': 2,
|
'channel': 2,
|
||||||
|
@ -418,3 +420,25 @@ INDEXED_LANGUAGES = [
|
||||||
'zh',
|
'zh',
|
||||||
'zu'
|
'zu'
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class ResolveResult(typing.NamedTuple):
|
||||||
|
name: str
|
||||||
|
claim_hash: bytes
|
||||||
|
tx_num: int
|
||||||
|
position: int
|
||||||
|
tx_hash: bytes
|
||||||
|
height: int
|
||||||
|
amount: int
|
||||||
|
short_url: str
|
||||||
|
is_controlling: bool
|
||||||
|
canonical_url: str
|
||||||
|
creation_height: int
|
||||||
|
activation_height: int
|
||||||
|
expiration_height: int
|
||||||
|
effective_amount: int
|
||||||
|
support_amount: int
|
||||||
|
last_takeover_height: typing.Optional[int]
|
||||||
|
claims_in_channel: typing.Optional[int]
|
||||||
|
channel_hash: typing.Optional[bytes]
|
||||||
|
reposted_claim_hash: typing.Optional[bytes]
|
||||||
|
|
|
@ -53,7 +53,7 @@ FIELDS = {
|
||||||
'duration', 'release_time',
|
'duration', 'release_time',
|
||||||
'tags', 'languages', 'has_source', 'reposted_claim_type',
|
'tags', 'languages', 'has_source', 'reposted_claim_type',
|
||||||
'reposted_claim_id', 'repost_count',
|
'reposted_claim_id', 'repost_count',
|
||||||
'trending_group', 'trending_mixed', 'trending_local', 'trending_global',
|
'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'tx_num'
|
||||||
}
|
}
|
||||||
|
|
||||||
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', 'censoring_channel_id',
|
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', 'censoring_channel_id',
|
||||||
|
@ -66,7 +66,7 @@ RANGE_FIELDS = {
|
||||||
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
|
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
|
||||||
'amount', 'effective_amount', 'support_amount',
|
'amount', 'effective_amount', 'support_amount',
|
||||||
'trending_group', 'trending_mixed', 'censor_type',
|
'trending_group', 'trending_mixed', 'censor_type',
|
||||||
'trending_local', 'trending_global',
|
'trending_local', 'trending_global', 'tx_num'
|
||||||
}
|
}
|
||||||
|
|
||||||
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
|
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
|
||||||
|
|
|
@ -19,6 +19,7 @@ from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES
|
||||||
from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \
|
from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \
|
||||||
RANGE_FIELDS, ALL_FIELDS
|
RANGE_FIELDS, ALL_FIELDS
|
||||||
from lbry.wallet.server.util import class_logger
|
from lbry.wallet.server.util import class_logger
|
||||||
|
from lbry.wallet.server.db.common import ResolveResult
|
||||||
|
|
||||||
|
|
||||||
class ChannelResolution(str):
|
class ChannelResolution(str):
|
||||||
|
@ -185,11 +186,59 @@ class SearchIndex:
|
||||||
response, offset, total = await self.search(**kwargs)
|
response, offset, total = await self.search(**kwargs)
|
||||||
censor.apply(response)
|
censor.apply(response)
|
||||||
total_referenced.extend(response)
|
total_referenced.extend(response)
|
||||||
|
|
||||||
if censor.censored:
|
if censor.censored:
|
||||||
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
|
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
|
||||||
total_referenced.extend(response)
|
total_referenced.extend(response)
|
||||||
|
|
||||||
|
response = [
|
||||||
|
ResolveResult(
|
||||||
|
name=r['claim_name'],
|
||||||
|
claim_hash=r['claim_hash'],
|
||||||
|
tx_num=r['tx_num'],
|
||||||
|
position=r['tx_nout'],
|
||||||
|
tx_hash=r['tx_hash'],
|
||||||
|
height=r['height'],
|
||||||
|
amount=r['amount'],
|
||||||
|
short_url=r['short_url'],
|
||||||
|
is_controlling=r['is_controlling'],
|
||||||
|
canonical_url=r['canonical_url'],
|
||||||
|
creation_height=r['creation_height'],
|
||||||
|
activation_height=r['activation_height'],
|
||||||
|
expiration_height=r['expiration_height'],
|
||||||
|
effective_amount=r['effective_amount'],
|
||||||
|
support_amount=r['support_amount'],
|
||||||
|
last_takeover_height=r['last_take_over_height'],
|
||||||
|
claims_in_channel=r['claims_in_channel'],
|
||||||
|
channel_hash=r['channel_hash'],
|
||||||
|
reposted_claim_hash=r['reposted_claim_hash']
|
||||||
|
) for r in response
|
||||||
|
]
|
||||||
|
extra = [
|
||||||
|
ResolveResult(
|
||||||
|
name=r['claim_name'],
|
||||||
|
claim_hash=r['claim_hash'],
|
||||||
|
tx_num=r['tx_num'],
|
||||||
|
position=r['tx_nout'],
|
||||||
|
tx_hash=r['tx_hash'],
|
||||||
|
height=r['height'],
|
||||||
|
amount=r['amount'],
|
||||||
|
short_url=r['short_url'],
|
||||||
|
is_controlling=r['is_controlling'],
|
||||||
|
canonical_url=r['canonical_url'],
|
||||||
|
creation_height=r['creation_height'],
|
||||||
|
activation_height=r['activation_height'],
|
||||||
|
expiration_height=r['expiration_height'],
|
||||||
|
effective_amount=r['effective_amount'],
|
||||||
|
support_amount=r['support_amount'],
|
||||||
|
last_takeover_height=r['last_take_over_height'],
|
||||||
|
claims_in_channel=r['claims_in_channel'],
|
||||||
|
channel_hash=r['channel_hash'],
|
||||||
|
reposted_claim_hash=r['reposted_claim_hash']
|
||||||
|
) for r in await self._get_referenced_rows(total_referenced)
|
||||||
|
]
|
||||||
result = Outputs.to_base64(
|
result = Outputs.to_base64(
|
||||||
response, await self._get_referenced_rows(total_referenced), offset, total, censor
|
response, extra, offset, total, censor
|
||||||
)
|
)
|
||||||
cache_item.result = result
|
cache_item.result = result
|
||||||
return result
|
return result
|
||||||
|
|
|
@ -36,6 +36,7 @@ from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_ui
|
||||||
from lbry.wallet.server.storage import db_class
|
from lbry.wallet.server.storage import db_class
|
||||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
|
from lbry.wallet.server.db.common import ResolveResult
|
||||||
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
||||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
|
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
|
||||||
|
@ -75,28 +76,6 @@ class FlushData:
|
||||||
undo = attr.ib()
|
undo = attr.ib()
|
||||||
|
|
||||||
|
|
||||||
class ResolveResult(typing.NamedTuple):
|
|
||||||
name: str
|
|
||||||
claim_hash: bytes
|
|
||||||
tx_num: int
|
|
||||||
position: int
|
|
||||||
tx_hash: bytes
|
|
||||||
height: int
|
|
||||||
amount: int
|
|
||||||
short_url: str
|
|
||||||
is_controlling: bool
|
|
||||||
canonical_url: str
|
|
||||||
creation_height: int
|
|
||||||
activation_height: int
|
|
||||||
expiration_height: int
|
|
||||||
effective_amount: int
|
|
||||||
support_amount: int
|
|
||||||
last_takeover_height: Optional[int]
|
|
||||||
claims_in_channel: Optional[int]
|
|
||||||
channel_hash: Optional[bytes]
|
|
||||||
reposted_claim_hash: Optional[bytes]
|
|
||||||
|
|
||||||
|
|
||||||
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
|
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
|
||||||
|
|
||||||
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll')
|
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll')
|
||||||
|
@ -259,9 +238,9 @@ class LevelDB:
|
||||||
# winning resolution
|
# winning resolution
|
||||||
controlling = self.get_controlling_claim(normalized_name)
|
controlling = self.get_controlling_claim(normalized_name)
|
||||||
if not controlling:
|
if not controlling:
|
||||||
print("none controlling")
|
print(f"none controlling for lbry://{normalized_name}")
|
||||||
return
|
return
|
||||||
print("resolved controlling", controlling.claim_hash.hex())
|
print(f"resolved controlling lbry://{normalized_name}#{controlling.claim_hash.hex()}")
|
||||||
return self._fs_get_claim_by_hash(controlling.claim_hash)
|
return self._fs_get_claim_by_hash(controlling.claim_hash)
|
||||||
|
|
||||||
amount_order = max(int(amount_order or 1), 1)
|
amount_order = max(int(amount_order or 1), 1)
|
||||||
|
|
|
@ -1035,7 +1035,6 @@ class LBRYElectrumX(SessionBase):
|
||||||
async def claimtrie_resolve(self, *urls):
|
async def claimtrie_resolve(self, *urls):
|
||||||
rows, extra = [], []
|
rows, extra = [], []
|
||||||
for url in urls:
|
for url in urls:
|
||||||
print("resolve", url)
|
|
||||||
self.session_mgr.urls_to_resolve_count_metric.inc()
|
self.session_mgr.urls_to_resolve_count_metric.inc()
|
||||||
stream, channel = await self.db.fs_resolve(url)
|
stream, channel = await self.db.fs_resolve(url)
|
||||||
self.session_mgr.resolved_url_count_metric.inc()
|
self.session_mgr.resolved_url_count_metric.inc()
|
||||||
|
@ -1071,7 +1070,6 @@ class LBRYElectrumX(SessionBase):
|
||||||
if not stream:
|
if not stream:
|
||||||
stream = LookupError(f"Could not find claim at {claim_id}")
|
stream = LookupError(f"Could not find claim at {claim_id}")
|
||||||
rows.append(stream)
|
rows.append(stream)
|
||||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
|
||||||
return Outputs.to_base64(rows, extra, 0, None, None)
|
return Outputs.to_base64(rows, extra, 0, None, None)
|
||||||
|
|
||||||
def assert_tx_hash(self, value):
|
def assert_tx_hash(self, value):
|
||||||
|
|
Loading…
Reference in a new issue