From a919a3a519eb47739cbac5cb956f0755cf6d662b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 18 May 2022 13:34:36 -0400 Subject: [PATCH] move search.py to herald --- hub/common.py | 359 +++++++++++++++++++++++++ hub/{elastic_sync => }/constants.py | 0 hub/db/common.py | 16 -- hub/db/db.py | 4 +- hub/elastic_sync/service.py | 4 +- hub/{elastic_sync => herald}/search.py | 241 +---------------- hub/herald/session.py | 2 +- 7 files changed, 366 insertions(+), 260 deletions(-) rename hub/{elastic_sync => }/constants.py (100%) rename hub/{elastic_sync => herald}/search.py (55%) diff --git a/hub/common.py b/hub/common.py index 32df6dd..172a3b5 100644 --- a/hub/common.py +++ b/hub/common.py @@ -1,3 +1,4 @@ +import struct import hashlib import hmac import ipaddress @@ -5,8 +6,13 @@ import logging import logging.handlers import typing import collections +from decimal import Decimal +from typing import Iterable from asyncio import get_event_loop, Event from prometheus_client import Counter +from hub.schema.tags import clean_tags +from hub.schema.url import normalize_name +from hub.error import TooManyClaimSearchParametersError log = logging.getLogger(__name__) @@ -23,6 +29,22 @@ HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) +CLAIM_TYPES = { + 'stream': 1, + 'channel': 2, + 'repost': 3, + 'collection': 4, +} + +STREAM_TYPES = { + 'video': 1, + 'audio': 2, + 'image': 3, + 'document': 4, + 'binary': 5, + 'model': 6, +} + def setup_logging(log_path: str): log = logging.getLogger('scribe') @@ -404,3 +426,340 @@ class IndexVersionMismatch(Exception): def __init__(self, got_version, expected_version): self.got_version = got_version self.expected_version = expected_version + + +# Elasticsearch constants + +INDEX_DEFAULT_SETTINGS = { + "settings": + {"analysis": + {"analyzer": { + "default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}}, + "index": + {"refresh_interval": -1, + "number_of_shards": 1, + "number_of_replicas": 0, + "sort": { + "field": ["trending_score", "release_time"], + "order": ["desc", "desc"] + }} + }, + "mappings": { + "properties": { + "claim_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text", + "index_prefixes": { + "min_chars": 1, + "max_chars": 10 + } + }, + "sd_hash": { + "fields": { + "keyword": { + "ignore_above": 96, + "type": "keyword" + } + }, + "type": "text", + "index_prefixes": { + "min_chars": 1, + "max_chars": 4 + } + }, + "height": {"type": "integer"}, + "claim_type": {"type": "byte"}, + "censor_type": {"type": "byte"}, + "trending_score": {"type": "double"}, + "release_time": {"type": "long"} + } + } +} + +FIELDS = { + '_id', + 'claim_id', 'claim_type', 'claim_name', 'normalized_name', + 'tx_id', 'tx_nout', 'tx_position', + 'short_url', 'canonical_url', + 'is_controlling', 'last_take_over_height', + 'public_key_bytes', 'public_key_id', 'claims_in_channel', + 'channel_id', 'signature', 'signature_digest', 'is_signature_valid', + 'amount', 'effective_amount', 'support_amount', + 'fee_amount', 'fee_currency', + 'height', 'creation_height', 'activation_height', 'expiration_height', + 'stream_type', 'media_type', 'censor_type', + 'title', 'author', 'description', + 'timestamp', 'creation_timestamp', + 'duration', 'release_time', + 'tags', 'languages', 'has_source', 'reposted_claim_type', + 'reposted_claim_id', 'repost_count', 'sd_hash', + 'trending_score', 'tx_num', + 'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id', + 'reposted_tx_position', 'reposted_height', +} + +TEXT_FIELDS = { + 'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id', + 'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature', + 'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', + 'tags', 'sd_hash', 'channel_tx_id', 'reposted_tx_id', +} + +RANGE_FIELDS = { + 'height', 'creation_height', 'activation_height', 'expiration_height', + 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', + 'tx_position', 'repost_count', 'limit_claims_per_channel', + 'amount', 'effective_amount', 'support_amount', + 'trending_score', 'censor_type', 'tx_num', 'reposted_tx_position', 'reposted_height', + 'channel_tx_position', 'channel_height', +} + +ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS + +REPLACEMENTS = { + 'claim_name': 'normalized_name', + 'name': 'normalized_name', + 'txid': 'tx_id', + 'nout': 'tx_nout', + 'trending_group': 'trending_score', + 'trending_mixed': 'trending_score', + 'trending_global': 'trending_score', + 'trending_local': 'trending_score', + 'reposted': 'repost_count', + 'stream_types': 'stream_type', + 'media_types': 'media_type', + 'valid_channel_signature': 'is_signature_valid' +} + + +def expand_query(**kwargs): + if "amount_order" in kwargs: + kwargs["limit"] = 1 + kwargs["order_by"] = "effective_amount" + kwargs["offset"] = int(kwargs["amount_order"]) - 1 + if 'name' in kwargs: + kwargs['name'] = normalize_name(kwargs.pop('name')) + if kwargs.get('is_controlling') is False: + kwargs.pop('is_controlling') + query = {'must': [], 'must_not': []} + collapse = None + if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None: + kwargs['fee_currency'] = kwargs['fee_currency'].upper() + for key, value in kwargs.items(): + key = key.replace('claim.', '') + many = key.endswith('__in') or isinstance(value, list) + if many and len(value) > 2048: + raise TooManyClaimSearchParametersError(key, 2048) + if many: + key = key.replace('__in', '') + value = list(filter(None, value)) + if value is None or isinstance(value, list) and len(value) == 0: + continue + key = REPLACEMENTS.get(key, key) + if key in FIELDS: + partial_id = False + if key == 'claim_type': + if isinstance(value, str): + value = CLAIM_TYPES[value] + else: + value = [CLAIM_TYPES[claim_type] for claim_type in value] + elif key == 'stream_type': + value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value)) + if key == '_id': + if isinstance(value, Iterable): + value = [item[::-1].hex() for item in value] + else: + value = value[::-1].hex() + if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20: + partial_id = True + if key in ('signature_valid', 'has_source'): + continue # handled later + if key in TEXT_FIELDS: + key += '.keyword' + ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} + if partial_id: + query['must'].append({"prefix": {key: value}}) + elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops: + operator_length = 2 if value[:2] in ops else 1 + operator, value = value[:operator_length], value[operator_length:] + if key == 'fee_amount': + value = str(Decimal(value)*1000) + query['must'].append({"range": {key: {ops[operator]: value}}}) + elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value): + range_constraints = [] + release_times = [] + for v in value: + operator_length = 2 if v[:2] in ops else 1 + operator, stripped_op_v = v[:operator_length], v[operator_length:] + if key == 'fee_amount': + stripped_op_v = str(Decimal(stripped_op_v)*1000) + if key == 'release_time': + release_times.append((operator, stripped_op_v)) + else: + range_constraints.append((operator, stripped_op_v)) + if key != 'release_time': + query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}}) + else: + query['must'].append( + {"bool": + {"should": [ + {"bool": { + "must_not": { + "exists": { + "field": "release_time" + } + } + }}, + {"bool": { + "must": [ + {"exists": {"field": "release_time"}}, + {'range': {key: {ops[operator]: v for operator, v in release_times}}}, + ]}}, + ]} + } + ) + elif many: + query['must'].append({"terms": {key: value}}) + else: + if key == 'fee_amount': + value = str(Decimal(value)*1000) + query['must'].append({"term": {key: {"value": value}}}) + elif key == 'not_channel_ids': + for channel_id in value: + query['must_not'].append({"term": {'channel_id.keyword': channel_id}}) + query['must_not'].append({"term": {'_id': channel_id}}) + elif key == 'channel_ids': + query['must'].append({"terms": {'channel_id.keyword': value}}) + elif key == 'claim_ids': + query['must'].append({"terms": {'claim_id.keyword': value}}) + elif key == 'media_types': + query['must'].append({"terms": {'media_type.keyword': value}}) + elif key == 'any_languages': + query['must'].append({"terms": {'languages': clean_tags(value)}}) + elif key == 'any_languages': + query['must'].append({"terms": {'languages': value}}) + elif key == 'all_languages': + query['must'].extend([{"term": {'languages': tag}} for tag in value]) + elif key == 'any_tags': + query['must'].append({"terms": {'tags.keyword': clean_tags(value)}}) + elif key == 'all_tags': + query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) + elif key == 'not_tags': + query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) + elif key == 'not_claim_id': + query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value]) + elif key == 'limit_claims_per_channel': + collapse = ('channel_id.keyword', value) + if kwargs.get('has_channel_signature'): + query['must'].append({"exists": {"field": "signature"}}) + if 'signature_valid' in kwargs: + query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) + elif 'signature_valid' in kwargs: + query['must'].append( + {"bool": + {"should": [ + {"bool": {"must_not": {"exists": {"field": "signature"}}}}, + {"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}} + ]} + } + ) + if 'has_source' in kwargs: + is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}} + query['must'].append( + {"bool": + {"should": [ + {"bool": # when is_stream_or_repost AND has_source + {"must": [ + {"match": {"has_source": kwargs['has_source']}}, + is_stream_or_repost_terms, + ] + }, + }, + {"bool": # when not is_stream_or_repost + {"must_not": is_stream_or_repost_terms} + }, + {"bool": # when reposted_claim_type wouldn't have source + {"must_not": + [ + {"term": {"reposted_claim_type": CLAIM_TYPES['stream']}} + ], + "must": + [ + {"term": {"claim_type": CLAIM_TYPES['repost']}} + ] + } + } + ]} + } + ) + if kwargs.get('text'): + query['must'].append( + {"simple_query_string": + {"query": kwargs["text"], "fields": [ + "claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5" + ]}}) + query = { + "_source": {"excludes": ["description", "title"]}, + 'query': {'bool': query}, + "sort": [], + } + if "limit" in kwargs: + query["size"] = kwargs["limit"] + if 'offset' in kwargs: + query["from"] = kwargs["offset"] + if 'order_by' in kwargs: + if isinstance(kwargs["order_by"], str): + kwargs["order_by"] = [kwargs["order_by"]] + for value in kwargs['order_by']: + if 'trending_group' in value: + # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. + continue + is_asc = value.startswith('^') + value = value[1:] if is_asc else value + value = REPLACEMENTS.get(value, value) + if value in TEXT_FIELDS: + value += '.keyword' + query['sort'].append({value: "asc" if is_asc else "desc"}) + if collapse: + query["collapse"] = { + "field": collapse[0], + "inner_hits": { + "name": collapse[0], + "size": collapse[1], + "sort": query["sort"] + } + } + return query + + +def expand_result(results): + inner_hits = [] + expanded = [] + for result in results: + if result.get("inner_hits"): + for _, inner_hit in result["inner_hits"].items(): + inner_hits.extend(inner_hit["hits"]["hits"]) + continue + result = result['_source'] + result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1] + if result['reposted_claim_id']: + result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1] + else: + result['reposted_claim_hash'] = None + result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None + result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack(' 2048: - raise TooManyClaimSearchParametersError(key, 2048) - if many: - key = key.replace('__in', '') - value = list(filter(None, value)) - if value is None or isinstance(value, list) and len(value) == 0: - continue - key = REPLACEMENTS.get(key, key) - if key in FIELDS: - partial_id = False - if key == 'claim_type': - if isinstance(value, str): - value = CLAIM_TYPES[value] - else: - value = [CLAIM_TYPES[claim_type] for claim_type in value] - elif key == 'stream_type': - value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value)) - if key == '_id': - if isinstance(value, Iterable): - value = [item[::-1].hex() for item in value] - else: - value = value[::-1].hex() - if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20: - partial_id = True - if key in ('signature_valid', 'has_source'): - continue # handled later - if key in TEXT_FIELDS: - key += '.keyword' - ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} - if partial_id: - query['must'].append({"prefix": {key: value}}) - elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops: - operator_length = 2 if value[:2] in ops else 1 - operator, value = value[:operator_length], value[operator_length:] - if key == 'fee_amount': - value = str(Decimal(value)*1000) - query['must'].append({"range": {key: {ops[operator]: value}}}) - elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value): - range_constraints = [] - release_times = [] - for v in value: - operator_length = 2 if v[:2] in ops else 1 - operator, stripped_op_v = v[:operator_length], v[operator_length:] - if key == 'fee_amount': - stripped_op_v = str(Decimal(stripped_op_v)*1000) - if key == 'release_time': - release_times.append((operator, stripped_op_v)) - else: - range_constraints.append((operator, stripped_op_v)) - if key != 'release_time': - query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}}) - else: - query['must'].append( - {"bool": - {"should": [ - {"bool": { - "must_not": { - "exists": { - "field": "release_time" - } - } - }}, - {"bool": { - "must": [ - {"exists": {"field": "release_time"}}, - {'range': {key: {ops[operator]: v for operator, v in release_times}}}, - ]}}, - ]} - } - ) - elif many: - query['must'].append({"terms": {key: value}}) - else: - if key == 'fee_amount': - value = str(Decimal(value)*1000) - query['must'].append({"term": {key: {"value": value}}}) - elif key == 'not_channel_ids': - for channel_id in value: - query['must_not'].append({"term": {'channel_id.keyword': channel_id}}) - query['must_not'].append({"term": {'_id': channel_id}}) - elif key == 'channel_ids': - query['must'].append({"terms": {'channel_id.keyword': value}}) - elif key == 'claim_ids': - query['must'].append({"terms": {'claim_id.keyword': value}}) - elif key == 'media_types': - query['must'].append({"terms": {'media_type.keyword': value}}) - elif key == 'any_languages': - query['must'].append({"terms": {'languages': clean_tags(value)}}) - elif key == 'any_languages': - query['must'].append({"terms": {'languages': value}}) - elif key == 'all_languages': - query['must'].extend([{"term": {'languages': tag}} for tag in value]) - elif key == 'any_tags': - query['must'].append({"terms": {'tags.keyword': clean_tags(value)}}) - elif key == 'all_tags': - query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) - elif key == 'not_tags': - query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) - elif key == 'not_claim_id': - query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value]) - elif key == 'limit_claims_per_channel': - collapse = ('channel_id.keyword', value) - if kwargs.get('has_channel_signature'): - query['must'].append({"exists": {"field": "signature"}}) - if 'signature_valid' in kwargs: - query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) - elif 'signature_valid' in kwargs: - query['must'].append( - {"bool": - {"should": [ - {"bool": {"must_not": {"exists": {"field": "signature"}}}}, - {"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}} - ]} - } - ) - if 'has_source' in kwargs: - is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}} - query['must'].append( - {"bool": - {"should": [ - {"bool": # when is_stream_or_repost AND has_source - {"must": [ - {"match": {"has_source": kwargs['has_source']}}, - is_stream_or_repost_terms, - ] - }, - }, - {"bool": # when not is_stream_or_repost - {"must_not": is_stream_or_repost_terms} - }, - {"bool": # when reposted_claim_type wouldn't have source - {"must_not": - [ - {"term": {"reposted_claim_type": CLAIM_TYPES['stream']}} - ], - "must": - [ - {"term": {"claim_type": CLAIM_TYPES['repost']}} - ] - } - } - ]} - } - ) - if kwargs.get('text'): - query['must'].append( - {"simple_query_string": - {"query": kwargs["text"], "fields": [ - "claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5" - ]}}) - query = { - "_source": {"excludes": ["description", "title"]}, - 'query': {'bool': query}, - "sort": [], - } - if "limit" in kwargs: - query["size"] = kwargs["limit"] - if 'offset' in kwargs: - query["from"] = kwargs["offset"] - if 'order_by' in kwargs: - if isinstance(kwargs["order_by"], str): - kwargs["order_by"] = [kwargs["order_by"]] - for value in kwargs['order_by']: - if 'trending_group' in value: - # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. - continue - is_asc = value.startswith('^') - value = value[1:] if is_asc else value - value = REPLACEMENTS.get(value, value) - if value in TEXT_FIELDS: - value += '.keyword' - query['sort'].append({value: "asc" if is_asc else "desc"}) - if collapse: - query["collapse"] = { - "field": collapse[0], - "inner_hits": { - "name": collapse[0], - "size": collapse[1], - "sort": query["sort"] - } - } - return query - - -def expand_result(results): - inner_hits = [] - expanded = [] - for result in results: - if result.get("inner_hits"): - for _, inner_hit in result["inner_hits"].items(): - inner_hits.extend(inner_hit["hits"]["hits"]) - continue - result = result['_source'] - result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1] - if result['reposted_claim_id']: - result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1] - else: - result['reposted_claim_hash'] = None - result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None - result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack('