ES: all _hash to _id

This commit is contained in:
Victor Shyba 2021-07-20 18:09:39 -03:00 committed by Victor Shyba
parent fe4b07b8ae
commit a533cda6f0
4 changed files with 21 additions and 21 deletions

View file

@ -38,13 +38,13 @@ INDEX_DEFAULT_SETTINGS = {
FIELDS = {'is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount', FIELDS = {'is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount',
'timestamp', 'creation_timestamp', 'height', 'creation_height', 'activation_height', 'expiration_height', 'timestamp', 'creation_timestamp', 'height', 'creation_height', 'activation_height', 'expiration_height',
'release_time', 'short_url', 'canonical_url', 'title', 'author', 'description', 'claim_type', 'reposted', 'release_time', 'short_url', 'canonical_url', 'title', 'author', 'description', 'claim_type', 'reposted',
'stream_type', 'media_type', 'fee_amount', 'fee_currency', 'duration', 'reposted_claim_hash', 'censor_type', 'stream_type', 'media_type', 'fee_amount', 'fee_currency', 'duration', 'censor_type',
'claims_in_channel', 'channel_join', 'signature_valid', 'effective_amount', 'support_amount', 'claims_in_channel', 'channel_join', 'signature_valid', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'channel_id', 'tx_id', 'tx_nout', 'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'channel_id', 'tx_id', 'tx_nout',
'signature', 'signature_digest', 'public_key_bytes', 'public_key_hash', 'public_key_id', '_id', 'tags', 'signature', 'signature_digest', 'public_key_bytes', 'public_key_id', '_id', 'tags', 'censoring_channel_id',
'reposted_claim_id'} 'reposted_claim_id'}
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id',
'media_type', 'normalized', 'public_key_bytes', 'public_key_hash', 'short_url', 'signature', 'media_type', 'normalized', 'public_key_bytes', 'public_key_id', 'short_url', 'signature',
'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'} 'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'}
RANGE_FIELDS = { RANGE_FIELDS = {
'height', 'creation_height', 'activation_height', 'expiration_height', 'height', 'creation_height', 'activation_height', 'expiration_height',
@ -54,8 +54,8 @@ RANGE_FIELDS = {
'trending_group', 'trending_mixed', 'censor_type', 'trending_group', 'trending_mixed', 'censor_type',
'trending_local', 'trending_global', 'trending_local', 'trending_global',
} }
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
REPLACEMENTS = { REPLACEMENTS = {
'name': 'normalized', 'name': 'normalized',
'txid': 'tx_id', 'txid': 'tx_id'
'claim_hash': '_id'
} }

View file

@ -17,7 +17,7 @@ from lbry.schema.url import URL, normalize_name
from lbry.utils import LRUCache from lbry.utils import LRUCache
from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES
from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \
RANGE_FIELDS RANGE_FIELDS, ALL_FIELDS
from lbry.wallet.server.util import class_logger from lbry.wallet.server.util import class_logger
@ -133,7 +133,7 @@ class SearchIndex:
update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
key = 'channel_id' if channels else 'claim_id' key = 'channel_id' if channels else 'claim_id'
update['script'] = { update['script'] = {
"source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_hash=params[ctx._source.{key}]", "source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_id=params[ctx._source.{key}]",
"lang": "painless", "lang": "painless",
"params": blockdict "params": blockdict
} }
@ -208,7 +208,7 @@ class SearchIndex:
censored = [ censored = [
result if not isinstance(result, dict) or not censor.censor(result) result if not isinstance(result, dict) or not censor.censor(result)
else ResolveCensoredError(url, result['censoring_channel_hash']) else ResolveCensoredError(url, result['censoring_channel_id'])
for url, result in zip(urls, results) for url, result in zip(urls, results)
] ]
return results, censored, censor return results, censored, censor
@ -411,7 +411,7 @@ class SearchIndex:
txo_rows = [row for row in txo_rows if isinstance(row, dict)] txo_rows = [row for row in txo_rows if isinstance(row, dict)]
referenced_ids = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows))) referenced_ids = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows)))
referenced_ids |= set(filter(None, (row['channel_id'] for row in txo_rows))) referenced_ids |= set(filter(None, (row['channel_id'] for row in txo_rows)))
referenced_ids |= set(map(parse_claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows)))) referenced_ids |= set(map(parse_claim_id, filter(None, (row['censoring_channel_id'] for row in txo_rows))))
referenced_txos = [] referenced_txos = []
if referenced_ids: if referenced_ids:
@ -432,8 +432,7 @@ def extract_doc(doc, index):
doc['reposted_claim_id'] = None doc['reposted_claim_id'] = None
channel_hash = doc.pop('channel_hash') channel_hash = doc.pop('channel_hash')
doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash
channel_hash = doc.pop('censoring_channel_hash') doc['censoring_channel_id'] = doc.get('censoring_channel_id')
doc['censoring_channel_hash'] = channel_hash[::-1].hex() if channel_hash else channel_hash
txo_hash = doc.pop('txo_hash') txo_hash = doc.pop('txo_hash')
doc['tx_id'] = txo_hash[:32][::-1].hex() doc['tx_id'] = txo_hash[:32][::-1].hex()
doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0] doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0]
@ -441,11 +440,12 @@ def extract_doc(doc, index):
doc['signature'] = (doc.pop('signature') or b'').hex() or None doc['signature'] = (doc.pop('signature') or b'').hex() or None
doc['signature_digest'] = (doc.pop('signature_digest') or b'').hex() or None doc['signature_digest'] = (doc.pop('signature_digest') or b'').hex() or None
doc['public_key_bytes'] = (doc.pop('public_key_bytes') or b'').hex() or None doc['public_key_bytes'] = (doc.pop('public_key_bytes') or b'').hex() or None
doc['public_key_hash'] = (doc.pop('public_key_hash') or b'').hex() or None doc['public_key_id'] = (doc.pop('public_key_hash') or b'').hex() or None
doc['signature_valid'] = bool(doc['signature_valid']) doc['signature_valid'] = bool(doc['signature_valid'])
doc['claim_type'] = doc.get('claim_type', 0) or 0 doc['claim_type'] = doc.get('claim_type', 0) or 0
doc['stream_type'] = int(doc.get('stream_type', 0) or 0) doc['stream_type'] = int(doc.get('stream_type', 0) or 0)
doc['has_source'] = bool(doc['has_source']) doc['has_source'] = bool(doc['has_source'])
doc = {key: value for key, value in doc.items() if key in ALL_FIELDS}
return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', 'doc_as_upsert': True} return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', 'doc_as_upsert': True}
@ -484,7 +484,6 @@ def expand_query(**kwargs):
if not many and key in ('_id', 'claim_id') and len(value) < 20: if not many and key in ('_id', 'claim_id') and len(value) < 20:
partial_id = True partial_id = True
if key == 'public_key_id': if key == 'public_key_id':
key = 'public_key_hash'
value = Base58.decode(value)[1:21].hex() value = Base58.decode(value)[1:21].hex()
if key == 'signature_valid': if key == 'signature_valid':
continue # handled later continue # handled later
@ -607,8 +606,8 @@ def expand_result(results):
result['channel_hash'] = unhexlify(result['channel_id'])[::-1] if result['channel_id'] else None result['channel_hash'] = unhexlify(result['channel_id'])[::-1] if result['channel_id'] else None
result['txo_hash'] = unhexlify(result['tx_id'])[::-1] + struct.pack('<I', result['tx_nout']) result['txo_hash'] = unhexlify(result['tx_id'])[::-1] + struct.pack('<I', result['tx_nout'])
result['tx_hash'] = unhexlify(result['tx_id'])[::-1] result['tx_hash'] = unhexlify(result['tx_id'])[::-1]
if result['censoring_channel_hash']: if result['censoring_channel_id']:
result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1] result['censoring_channel_hash'] = unhexlify(result['censoring_channel_id'])[::-1]
expanded.append(result) expanded.append(result)
if inner_hits: if inner_hits:
return expand_result(inner_hits) return expand_result(inner_hits)

View file

@ -36,7 +36,7 @@ ORDER BY claim.height desc
claim = dict(claim._asdict()) claim = dict(claim._asdict())
claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source']) claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source'])
claim['censor_type'] = 0 claim['censor_type'] = 0
claim['censoring_channel_hash'] = None claim['censoring_channel_id'] = None
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else [] claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
if num % 10_000 == 0: if num % 10_000 == 0:

View file

@ -828,21 +828,22 @@ class SQLDB:
claim = claim._asdict() claim = claim._asdict()
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash']))) id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
claim['censor_type'] = 0 claim['censor_type'] = 0
claim['censoring_channel_hash'] = None censoring_channel_hash = None
claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source']) claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source'])
for reason_id in id_set: for reason_id in id_set:
if reason_id in self.blocked_streams: if reason_id in self.blocked_streams:
claim['censor_type'] = 2 claim['censor_type'] = 2
claim['censoring_channel_hash'] = self.blocked_streams.get(reason_id) censoring_channel_hash = self.blocked_streams.get(reason_id)
elif reason_id in self.blocked_channels: elif reason_id in self.blocked_channels:
claim['censor_type'] = 2 claim['censor_type'] = 2
claim['censoring_channel_hash'] = self.blocked_channels.get(reason_id) censoring_channel_hash = self.blocked_channels.get(reason_id)
elif reason_id in self.filtered_streams: elif reason_id in self.filtered_streams:
claim['censor_type'] = 1 claim['censor_type'] = 1
claim['censoring_channel_hash'] = self.filtered_streams.get(reason_id) censoring_channel_hash = self.filtered_streams.get(reason_id)
elif reason_id in self.filtered_channels: elif reason_id in self.filtered_channels:
claim['censor_type'] = 1 claim['censor_type'] = 1
claim['censoring_channel_hash'] = self.filtered_channels.get(reason_id) censoring_channel_hash = self.filtered_channels.get(reason_id)
claim['censoring_channel_id'] = censoring_channel_hash[::-1].hex() if censoring_channel_hash else None
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else [] claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []