move search.py to herald
This commit is contained in:
parent
37fc334c46
commit
a919a3a519
7 changed files with 366 additions and 260 deletions
359
hub/common.py
359
hub/common.py
|
@ -1,3 +1,4 @@
|
||||||
|
import struct
|
||||||
import hashlib
|
import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
import ipaddress
|
import ipaddress
|
||||||
|
@ -5,8 +6,13 @@ import logging
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
import typing
|
import typing
|
||||||
import collections
|
import collections
|
||||||
|
from decimal import Decimal
|
||||||
|
from typing import Iterable
|
||||||
from asyncio import get_event_loop, Event
|
from asyncio import get_event_loop, Event
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
from hub.schema.tags import clean_tags
|
||||||
|
from hub.schema.url import normalize_name
|
||||||
|
from hub.error import TooManyClaimSearchParametersError
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -23,6 +29,22 @@ HISTOGRAM_BUCKETS = (
|
||||||
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
CLAIM_TYPES = {
|
||||||
|
'stream': 1,
|
||||||
|
'channel': 2,
|
||||||
|
'repost': 3,
|
||||||
|
'collection': 4,
|
||||||
|
}
|
||||||
|
|
||||||
|
STREAM_TYPES = {
|
||||||
|
'video': 1,
|
||||||
|
'audio': 2,
|
||||||
|
'image': 3,
|
||||||
|
'document': 4,
|
||||||
|
'binary': 5,
|
||||||
|
'model': 6,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def setup_logging(log_path: str):
|
def setup_logging(log_path: str):
|
||||||
log = logging.getLogger('scribe')
|
log = logging.getLogger('scribe')
|
||||||
|
@ -404,3 +426,340 @@ class IndexVersionMismatch(Exception):
|
||||||
def __init__(self, got_version, expected_version):
|
def __init__(self, got_version, expected_version):
|
||||||
self.got_version = got_version
|
self.got_version = got_version
|
||||||
self.expected_version = expected_version
|
self.expected_version = expected_version
|
||||||
|
|
||||||
|
|
||||||
|
# Elasticsearch constants
|
||||||
|
|
||||||
|
INDEX_DEFAULT_SETTINGS = {
|
||||||
|
"settings":
|
||||||
|
{"analysis":
|
||||||
|
{"analyzer": {
|
||||||
|
"default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}},
|
||||||
|
"index":
|
||||||
|
{"refresh_interval": -1,
|
||||||
|
"number_of_shards": 1,
|
||||||
|
"number_of_replicas": 0,
|
||||||
|
"sort": {
|
||||||
|
"field": ["trending_score", "release_time"],
|
||||||
|
"order": ["desc", "desc"]
|
||||||
|
}}
|
||||||
|
},
|
||||||
|
"mappings": {
|
||||||
|
"properties": {
|
||||||
|
"claim_id": {
|
||||||
|
"fields": {
|
||||||
|
"keyword": {
|
||||||
|
"ignore_above": 256,
|
||||||
|
"type": "keyword"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "text",
|
||||||
|
"index_prefixes": {
|
||||||
|
"min_chars": 1,
|
||||||
|
"max_chars": 10
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"sd_hash": {
|
||||||
|
"fields": {
|
||||||
|
"keyword": {
|
||||||
|
"ignore_above": 96,
|
||||||
|
"type": "keyword"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "text",
|
||||||
|
"index_prefixes": {
|
||||||
|
"min_chars": 1,
|
||||||
|
"max_chars": 4
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"height": {"type": "integer"},
|
||||||
|
"claim_type": {"type": "byte"},
|
||||||
|
"censor_type": {"type": "byte"},
|
||||||
|
"trending_score": {"type": "double"},
|
||||||
|
"release_time": {"type": "long"}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FIELDS = {
|
||||||
|
'_id',
|
||||||
|
'claim_id', 'claim_type', 'claim_name', 'normalized_name',
|
||||||
|
'tx_id', 'tx_nout', 'tx_position',
|
||||||
|
'short_url', 'canonical_url',
|
||||||
|
'is_controlling', 'last_take_over_height',
|
||||||
|
'public_key_bytes', 'public_key_id', 'claims_in_channel',
|
||||||
|
'channel_id', 'signature', 'signature_digest', 'is_signature_valid',
|
||||||
|
'amount', 'effective_amount', 'support_amount',
|
||||||
|
'fee_amount', 'fee_currency',
|
||||||
|
'height', 'creation_height', 'activation_height', 'expiration_height',
|
||||||
|
'stream_type', 'media_type', 'censor_type',
|
||||||
|
'title', 'author', 'description',
|
||||||
|
'timestamp', 'creation_timestamp',
|
||||||
|
'duration', 'release_time',
|
||||||
|
'tags', 'languages', 'has_source', 'reposted_claim_type',
|
||||||
|
'reposted_claim_id', 'repost_count', 'sd_hash',
|
||||||
|
'trending_score', 'tx_num',
|
||||||
|
'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id',
|
||||||
|
'reposted_tx_position', 'reposted_height',
|
||||||
|
}
|
||||||
|
|
||||||
|
TEXT_FIELDS = {
|
||||||
|
'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id',
|
||||||
|
'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature',
|
||||||
|
'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id',
|
||||||
|
'tags', 'sd_hash', 'channel_tx_id', 'reposted_tx_id',
|
||||||
|
}
|
||||||
|
|
||||||
|
RANGE_FIELDS = {
|
||||||
|
'height', 'creation_height', 'activation_height', 'expiration_height',
|
||||||
|
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
|
||||||
|
'tx_position', 'repost_count', 'limit_claims_per_channel',
|
||||||
|
'amount', 'effective_amount', 'support_amount',
|
||||||
|
'trending_score', 'censor_type', 'tx_num', 'reposted_tx_position', 'reposted_height',
|
||||||
|
'channel_tx_position', 'channel_height',
|
||||||
|
}
|
||||||
|
|
||||||
|
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
|
||||||
|
|
||||||
|
REPLACEMENTS = {
|
||||||
|
'claim_name': 'normalized_name',
|
||||||
|
'name': 'normalized_name',
|
||||||
|
'txid': 'tx_id',
|
||||||
|
'nout': 'tx_nout',
|
||||||
|
'trending_group': 'trending_score',
|
||||||
|
'trending_mixed': 'trending_score',
|
||||||
|
'trending_global': 'trending_score',
|
||||||
|
'trending_local': 'trending_score',
|
||||||
|
'reposted': 'repost_count',
|
||||||
|
'stream_types': 'stream_type',
|
||||||
|
'media_types': 'media_type',
|
||||||
|
'valid_channel_signature': 'is_signature_valid'
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def expand_query(**kwargs):
|
||||||
|
if "amount_order" in kwargs:
|
||||||
|
kwargs["limit"] = 1
|
||||||
|
kwargs["order_by"] = "effective_amount"
|
||||||
|
kwargs["offset"] = int(kwargs["amount_order"]) - 1
|
||||||
|
if 'name' in kwargs:
|
||||||
|
kwargs['name'] = normalize_name(kwargs.pop('name'))
|
||||||
|
if kwargs.get('is_controlling') is False:
|
||||||
|
kwargs.pop('is_controlling')
|
||||||
|
query = {'must': [], 'must_not': []}
|
||||||
|
collapse = None
|
||||||
|
if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None:
|
||||||
|
kwargs['fee_currency'] = kwargs['fee_currency'].upper()
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
key = key.replace('claim.', '')
|
||||||
|
many = key.endswith('__in') or isinstance(value, list)
|
||||||
|
if many and len(value) > 2048:
|
||||||
|
raise TooManyClaimSearchParametersError(key, 2048)
|
||||||
|
if many:
|
||||||
|
key = key.replace('__in', '')
|
||||||
|
value = list(filter(None, value))
|
||||||
|
if value is None or isinstance(value, list) and len(value) == 0:
|
||||||
|
continue
|
||||||
|
key = REPLACEMENTS.get(key, key)
|
||||||
|
if key in FIELDS:
|
||||||
|
partial_id = False
|
||||||
|
if key == 'claim_type':
|
||||||
|
if isinstance(value, str):
|
||||||
|
value = CLAIM_TYPES[value]
|
||||||
|
else:
|
||||||
|
value = [CLAIM_TYPES[claim_type] for claim_type in value]
|
||||||
|
elif key == 'stream_type':
|
||||||
|
value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
|
||||||
|
if key == '_id':
|
||||||
|
if isinstance(value, Iterable):
|
||||||
|
value = [item[::-1].hex() for item in value]
|
||||||
|
else:
|
||||||
|
value = value[::-1].hex()
|
||||||
|
if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20:
|
||||||
|
partial_id = True
|
||||||
|
if key in ('signature_valid', 'has_source'):
|
||||||
|
continue # handled later
|
||||||
|
if key in TEXT_FIELDS:
|
||||||
|
key += '.keyword'
|
||||||
|
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
|
||||||
|
if partial_id:
|
||||||
|
query['must'].append({"prefix": {key: value}})
|
||||||
|
elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
|
||||||
|
operator_length = 2 if value[:2] in ops else 1
|
||||||
|
operator, value = value[:operator_length], value[operator_length:]
|
||||||
|
if key == 'fee_amount':
|
||||||
|
value = str(Decimal(value)*1000)
|
||||||
|
query['must'].append({"range": {key: {ops[operator]: value}}})
|
||||||
|
elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value):
|
||||||
|
range_constraints = []
|
||||||
|
release_times = []
|
||||||
|
for v in value:
|
||||||
|
operator_length = 2 if v[:2] in ops else 1
|
||||||
|
operator, stripped_op_v = v[:operator_length], v[operator_length:]
|
||||||
|
if key == 'fee_amount':
|
||||||
|
stripped_op_v = str(Decimal(stripped_op_v)*1000)
|
||||||
|
if key == 'release_time':
|
||||||
|
release_times.append((operator, stripped_op_v))
|
||||||
|
else:
|
||||||
|
range_constraints.append((operator, stripped_op_v))
|
||||||
|
if key != 'release_time':
|
||||||
|
query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}})
|
||||||
|
else:
|
||||||
|
query['must'].append(
|
||||||
|
{"bool":
|
||||||
|
{"should": [
|
||||||
|
{"bool": {
|
||||||
|
"must_not": {
|
||||||
|
"exists": {
|
||||||
|
"field": "release_time"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}},
|
||||||
|
{"bool": {
|
||||||
|
"must": [
|
||||||
|
{"exists": {"field": "release_time"}},
|
||||||
|
{'range': {key: {ops[operator]: v for operator, v in release_times}}},
|
||||||
|
]}},
|
||||||
|
]}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
elif many:
|
||||||
|
query['must'].append({"terms": {key: value}})
|
||||||
|
else:
|
||||||
|
if key == 'fee_amount':
|
||||||
|
value = str(Decimal(value)*1000)
|
||||||
|
query['must'].append({"term": {key: {"value": value}}})
|
||||||
|
elif key == 'not_channel_ids':
|
||||||
|
for channel_id in value:
|
||||||
|
query['must_not'].append({"term": {'channel_id.keyword': channel_id}})
|
||||||
|
query['must_not'].append({"term": {'_id': channel_id}})
|
||||||
|
elif key == 'channel_ids':
|
||||||
|
query['must'].append({"terms": {'channel_id.keyword': value}})
|
||||||
|
elif key == 'claim_ids':
|
||||||
|
query['must'].append({"terms": {'claim_id.keyword': value}})
|
||||||
|
elif key == 'media_types':
|
||||||
|
query['must'].append({"terms": {'media_type.keyword': value}})
|
||||||
|
elif key == 'any_languages':
|
||||||
|
query['must'].append({"terms": {'languages': clean_tags(value)}})
|
||||||
|
elif key == 'any_languages':
|
||||||
|
query['must'].append({"terms": {'languages': value}})
|
||||||
|
elif key == 'all_languages':
|
||||||
|
query['must'].extend([{"term": {'languages': tag}} for tag in value])
|
||||||
|
elif key == 'any_tags':
|
||||||
|
query['must'].append({"terms": {'tags.keyword': clean_tags(value)}})
|
||||||
|
elif key == 'all_tags':
|
||||||
|
query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
|
||||||
|
elif key == 'not_tags':
|
||||||
|
query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
|
||||||
|
elif key == 'not_claim_id':
|
||||||
|
query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value])
|
||||||
|
elif key == 'limit_claims_per_channel':
|
||||||
|
collapse = ('channel_id.keyword', value)
|
||||||
|
if kwargs.get('has_channel_signature'):
|
||||||
|
query['must'].append({"exists": {"field": "signature"}})
|
||||||
|
if 'signature_valid' in kwargs:
|
||||||
|
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
|
||||||
|
elif 'signature_valid' in kwargs:
|
||||||
|
query['must'].append(
|
||||||
|
{"bool":
|
||||||
|
{"should": [
|
||||||
|
{"bool": {"must_not": {"exists": {"field": "signature"}}}},
|
||||||
|
{"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}}
|
||||||
|
]}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if 'has_source' in kwargs:
|
||||||
|
is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}}
|
||||||
|
query['must'].append(
|
||||||
|
{"bool":
|
||||||
|
{"should": [
|
||||||
|
{"bool": # when is_stream_or_repost AND has_source
|
||||||
|
{"must": [
|
||||||
|
{"match": {"has_source": kwargs['has_source']}},
|
||||||
|
is_stream_or_repost_terms,
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{"bool": # when not is_stream_or_repost
|
||||||
|
{"must_not": is_stream_or_repost_terms}
|
||||||
|
},
|
||||||
|
{"bool": # when reposted_claim_type wouldn't have source
|
||||||
|
{"must_not":
|
||||||
|
[
|
||||||
|
{"term": {"reposted_claim_type": CLAIM_TYPES['stream']}}
|
||||||
|
],
|
||||||
|
"must":
|
||||||
|
[
|
||||||
|
{"term": {"claim_type": CLAIM_TYPES['repost']}}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if kwargs.get('text'):
|
||||||
|
query['must'].append(
|
||||||
|
{"simple_query_string":
|
||||||
|
{"query": kwargs["text"], "fields": [
|
||||||
|
"claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5"
|
||||||
|
]}})
|
||||||
|
query = {
|
||||||
|
"_source": {"excludes": ["description", "title"]},
|
||||||
|
'query': {'bool': query},
|
||||||
|
"sort": [],
|
||||||
|
}
|
||||||
|
if "limit" in kwargs:
|
||||||
|
query["size"] = kwargs["limit"]
|
||||||
|
if 'offset' in kwargs:
|
||||||
|
query["from"] = kwargs["offset"]
|
||||||
|
if 'order_by' in kwargs:
|
||||||
|
if isinstance(kwargs["order_by"], str):
|
||||||
|
kwargs["order_by"] = [kwargs["order_by"]]
|
||||||
|
for value in kwargs['order_by']:
|
||||||
|
if 'trending_group' in value:
|
||||||
|
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
|
||||||
|
continue
|
||||||
|
is_asc = value.startswith('^')
|
||||||
|
value = value[1:] if is_asc else value
|
||||||
|
value = REPLACEMENTS.get(value, value)
|
||||||
|
if value in TEXT_FIELDS:
|
||||||
|
value += '.keyword'
|
||||||
|
query['sort'].append({value: "asc" if is_asc else "desc"})
|
||||||
|
if collapse:
|
||||||
|
query["collapse"] = {
|
||||||
|
"field": collapse[0],
|
||||||
|
"inner_hits": {
|
||||||
|
"name": collapse[0],
|
||||||
|
"size": collapse[1],
|
||||||
|
"sort": query["sort"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return query
|
||||||
|
|
||||||
|
|
||||||
|
def expand_result(results):
|
||||||
|
inner_hits = []
|
||||||
|
expanded = []
|
||||||
|
for result in results:
|
||||||
|
if result.get("inner_hits"):
|
||||||
|
for _, inner_hit in result["inner_hits"].items():
|
||||||
|
inner_hits.extend(inner_hit["hits"]["hits"])
|
||||||
|
continue
|
||||||
|
result = result['_source']
|
||||||
|
result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1]
|
||||||
|
if result['reposted_claim_id']:
|
||||||
|
result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1]
|
||||||
|
else:
|
||||||
|
result['reposted_claim_hash'] = None
|
||||||
|
result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None
|
||||||
|
result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack('<I', result['tx_nout'])
|
||||||
|
result['tx_hash'] = bytes.fromhex(result['tx_id'])[::-1]
|
||||||
|
result['reposted'] = result.pop('repost_count')
|
||||||
|
result['signature_valid'] = result.pop('is_signature_valid')
|
||||||
|
# result['normalized'] = result.pop('normalized_name')
|
||||||
|
# if result['censoring_channel_hash']:
|
||||||
|
# result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1]
|
||||||
|
expanded.append(result)
|
||||||
|
if inner_hits:
|
||||||
|
return expand_result(inner_hits)
|
||||||
|
return expanded
|
||||||
|
|
|
@ -53,22 +53,6 @@ class DB_PREFIXES(enum.Enum):
|
||||||
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
|
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
|
||||||
|
|
||||||
|
|
||||||
CLAIM_TYPES = {
|
|
||||||
'stream': 1,
|
|
||||||
'channel': 2,
|
|
||||||
'repost': 3,
|
|
||||||
'collection': 4,
|
|
||||||
}
|
|
||||||
|
|
||||||
STREAM_TYPES = {
|
|
||||||
'video': 1,
|
|
||||||
'audio': 2,
|
|
||||||
'image': 3,
|
|
||||||
'document': 4,
|
|
||||||
'binary': 5,
|
|
||||||
'model': 6,
|
|
||||||
}
|
|
||||||
|
|
||||||
# 9/21/2020
|
# 9/21/2020
|
||||||
MOST_USED_TAGS = {
|
MOST_USED_TAGS = {
|
||||||
"gaming",
|
"gaming",
|
||||||
|
|
|
@ -18,9 +18,9 @@ from hub.schema.url import URL, normalize_name
|
||||||
from hub.schema.claim import guess_stream_type
|
from hub.schema.claim import guess_stream_type
|
||||||
from hub.schema.result import Censor
|
from hub.schema.result import Censor
|
||||||
from hub.scribe.transaction import TxInput
|
from hub.scribe.transaction import TxInput
|
||||||
from hub.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256
|
from hub.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256, STREAM_TYPES, CLAIM_TYPES
|
||||||
from hub.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
|
from hub.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
|
||||||
from hub.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO
|
from hub.db.common import ResolveResult,ExpandedResolveResult, DBError, UTXO
|
||||||
from hub.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
|
from hub.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
|
||||||
from hub.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey
|
from hub.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey
|
||||||
from hub.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow
|
from hub.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow
|
||||||
|
|
|
@ -7,12 +7,10 @@ from elasticsearch import AsyncElasticsearch, NotFoundError
|
||||||
from elasticsearch.helpers import async_streaming_bulk
|
from elasticsearch.helpers import async_streaming_bulk
|
||||||
from hub.schema.result import Censor
|
from hub.schema.result import Censor
|
||||||
from hub.service import BlockchainReaderService
|
from hub.service import BlockchainReaderService
|
||||||
from hub.common import IndexVersionMismatch
|
from hub.common import IndexVersionMismatch, ALL_FIELDS, INDEX_DEFAULT_SETTINGS, expand_query
|
||||||
from hub.db.revertable import RevertableOp
|
from hub.db.revertable import RevertableOp
|
||||||
from hub.db.common import TrendingNotification, DB_PREFIXES
|
from hub.db.common import TrendingNotification, DB_PREFIXES
|
||||||
from hub.notifier_protocol import ElasticNotifierProtocol
|
from hub.notifier_protocol import ElasticNotifierProtocol
|
||||||
from hub.elastic_sync.search import expand_query
|
|
||||||
from hub.elastic_sync.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS
|
|
||||||
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.elastic_sync.env import ElasticEnv
|
from hub.elastic_sync.env import ElasticEnv
|
||||||
|
|
|
@ -1,20 +1,13 @@
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import struct
|
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from collections import Counter, deque
|
from collections import Counter, deque
|
||||||
from decimal import Decimal
|
|
||||||
from operator import itemgetter
|
from operator import itemgetter
|
||||||
from typing import Optional, List, Iterable, TYPE_CHECKING
|
from typing import Optional, List, TYPE_CHECKING
|
||||||
|
|
||||||
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
|
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
|
||||||
from hub.schema.result import Censor, Outputs
|
from hub.schema.result import Censor, Outputs
|
||||||
from hub.schema.tags import clean_tags
|
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
|
||||||
from hub.schema.url import normalize_name
|
|
||||||
from hub.error import TooManyClaimSearchParametersError
|
|
||||||
from hub.common import LRUCache, IndexVersionMismatch
|
|
||||||
from hub.db.common import CLAIM_TYPES, STREAM_TYPES
|
|
||||||
from hub.elastic_sync.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, RANGE_FIELDS
|
|
||||||
from hub.db.common import ResolveResult
|
from hub.db.common import ResolveResult
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from hub.db import HubDB
|
from hub.db import HubDB
|
||||||
|
@ -85,7 +78,7 @@ class SearchIndex:
|
||||||
self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
|
self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
|
||||||
raise IndexVersionMismatch(index_version, self.VERSION)
|
raise IndexVersionMismatch(index_version, self.VERSION)
|
||||||
await self.sync_client.indices.refresh(self.index)
|
await self.sync_client.indices.refresh(self.index)
|
||||||
return acked
|
return True
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
clients = [c for c in (self.sync_client, self.search_client) if c is not None]
|
clients = [c for c in (self.sync_client, self.search_client) if c is not None]
|
||||||
|
@ -291,234 +284,6 @@ class SearchIndex:
|
||||||
return referenced_txos
|
return referenced_txos
|
||||||
|
|
||||||
|
|
||||||
def expand_query(**kwargs):
|
|
||||||
if "amount_order" in kwargs:
|
|
||||||
kwargs["limit"] = 1
|
|
||||||
kwargs["order_by"] = "effective_amount"
|
|
||||||
kwargs["offset"] = int(kwargs["amount_order"]) - 1
|
|
||||||
if 'name' in kwargs:
|
|
||||||
kwargs['name'] = normalize_name(kwargs.pop('name'))
|
|
||||||
if kwargs.get('is_controlling') is False:
|
|
||||||
kwargs.pop('is_controlling')
|
|
||||||
query = {'must': [], 'must_not': []}
|
|
||||||
collapse = None
|
|
||||||
if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None:
|
|
||||||
kwargs['fee_currency'] = kwargs['fee_currency'].upper()
|
|
||||||
for key, value in kwargs.items():
|
|
||||||
key = key.replace('claim.', '')
|
|
||||||
many = key.endswith('__in') or isinstance(value, list)
|
|
||||||
if many and len(value) > 2048:
|
|
||||||
raise TooManyClaimSearchParametersError(key, 2048)
|
|
||||||
if many:
|
|
||||||
key = key.replace('__in', '')
|
|
||||||
value = list(filter(None, value))
|
|
||||||
if value is None or isinstance(value, list) and len(value) == 0:
|
|
||||||
continue
|
|
||||||
key = REPLACEMENTS.get(key, key)
|
|
||||||
if key in FIELDS:
|
|
||||||
partial_id = False
|
|
||||||
if key == 'claim_type':
|
|
||||||
if isinstance(value, str):
|
|
||||||
value = CLAIM_TYPES[value]
|
|
||||||
else:
|
|
||||||
value = [CLAIM_TYPES[claim_type] for claim_type in value]
|
|
||||||
elif key == 'stream_type':
|
|
||||||
value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
|
|
||||||
if key == '_id':
|
|
||||||
if isinstance(value, Iterable):
|
|
||||||
value = [item[::-1].hex() for item in value]
|
|
||||||
else:
|
|
||||||
value = value[::-1].hex()
|
|
||||||
if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20:
|
|
||||||
partial_id = True
|
|
||||||
if key in ('signature_valid', 'has_source'):
|
|
||||||
continue # handled later
|
|
||||||
if key in TEXT_FIELDS:
|
|
||||||
key += '.keyword'
|
|
||||||
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
|
|
||||||
if partial_id:
|
|
||||||
query['must'].append({"prefix": {key: value}})
|
|
||||||
elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
|
|
||||||
operator_length = 2 if value[:2] in ops else 1
|
|
||||||
operator, value = value[:operator_length], value[operator_length:]
|
|
||||||
if key == 'fee_amount':
|
|
||||||
value = str(Decimal(value)*1000)
|
|
||||||
query['must'].append({"range": {key: {ops[operator]: value}}})
|
|
||||||
elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value):
|
|
||||||
range_constraints = []
|
|
||||||
release_times = []
|
|
||||||
for v in value:
|
|
||||||
operator_length = 2 if v[:2] in ops else 1
|
|
||||||
operator, stripped_op_v = v[:operator_length], v[operator_length:]
|
|
||||||
if key == 'fee_amount':
|
|
||||||
stripped_op_v = str(Decimal(stripped_op_v)*1000)
|
|
||||||
if key == 'release_time':
|
|
||||||
release_times.append((operator, stripped_op_v))
|
|
||||||
else:
|
|
||||||
range_constraints.append((operator, stripped_op_v))
|
|
||||||
if key != 'release_time':
|
|
||||||
query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}})
|
|
||||||
else:
|
|
||||||
query['must'].append(
|
|
||||||
{"bool":
|
|
||||||
{"should": [
|
|
||||||
{"bool": {
|
|
||||||
"must_not": {
|
|
||||||
"exists": {
|
|
||||||
"field": "release_time"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}},
|
|
||||||
{"bool": {
|
|
||||||
"must": [
|
|
||||||
{"exists": {"field": "release_time"}},
|
|
||||||
{'range': {key: {ops[operator]: v for operator, v in release_times}}},
|
|
||||||
]}},
|
|
||||||
]}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
elif many:
|
|
||||||
query['must'].append({"terms": {key: value}})
|
|
||||||
else:
|
|
||||||
if key == 'fee_amount':
|
|
||||||
value = str(Decimal(value)*1000)
|
|
||||||
query['must'].append({"term": {key: {"value": value}}})
|
|
||||||
elif key == 'not_channel_ids':
|
|
||||||
for channel_id in value:
|
|
||||||
query['must_not'].append({"term": {'channel_id.keyword': channel_id}})
|
|
||||||
query['must_not'].append({"term": {'_id': channel_id}})
|
|
||||||
elif key == 'channel_ids':
|
|
||||||
query['must'].append({"terms": {'channel_id.keyword': value}})
|
|
||||||
elif key == 'claim_ids':
|
|
||||||
query['must'].append({"terms": {'claim_id.keyword': value}})
|
|
||||||
elif key == 'media_types':
|
|
||||||
query['must'].append({"terms": {'media_type.keyword': value}})
|
|
||||||
elif key == 'any_languages':
|
|
||||||
query['must'].append({"terms": {'languages': clean_tags(value)}})
|
|
||||||
elif key == 'any_languages':
|
|
||||||
query['must'].append({"terms": {'languages': value}})
|
|
||||||
elif key == 'all_languages':
|
|
||||||
query['must'].extend([{"term": {'languages': tag}} for tag in value])
|
|
||||||
elif key == 'any_tags':
|
|
||||||
query['must'].append({"terms": {'tags.keyword': clean_tags(value)}})
|
|
||||||
elif key == 'all_tags':
|
|
||||||
query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
|
|
||||||
elif key == 'not_tags':
|
|
||||||
query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
|
|
||||||
elif key == 'not_claim_id':
|
|
||||||
query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value])
|
|
||||||
elif key == 'limit_claims_per_channel':
|
|
||||||
collapse = ('channel_id.keyword', value)
|
|
||||||
if kwargs.get('has_channel_signature'):
|
|
||||||
query['must'].append({"exists": {"field": "signature"}})
|
|
||||||
if 'signature_valid' in kwargs:
|
|
||||||
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
|
|
||||||
elif 'signature_valid' in kwargs:
|
|
||||||
query['must'].append(
|
|
||||||
{"bool":
|
|
||||||
{"should": [
|
|
||||||
{"bool": {"must_not": {"exists": {"field": "signature"}}}},
|
|
||||||
{"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}}
|
|
||||||
]}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
if 'has_source' in kwargs:
|
|
||||||
is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}}
|
|
||||||
query['must'].append(
|
|
||||||
{"bool":
|
|
||||||
{"should": [
|
|
||||||
{"bool": # when is_stream_or_repost AND has_source
|
|
||||||
{"must": [
|
|
||||||
{"match": {"has_source": kwargs['has_source']}},
|
|
||||||
is_stream_or_repost_terms,
|
|
||||||
]
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{"bool": # when not is_stream_or_repost
|
|
||||||
{"must_not": is_stream_or_repost_terms}
|
|
||||||
},
|
|
||||||
{"bool": # when reposted_claim_type wouldn't have source
|
|
||||||
{"must_not":
|
|
||||||
[
|
|
||||||
{"term": {"reposted_claim_type": CLAIM_TYPES['stream']}}
|
|
||||||
],
|
|
||||||
"must":
|
|
||||||
[
|
|
||||||
{"term": {"claim_type": CLAIM_TYPES['repost']}}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
if kwargs.get('text'):
|
|
||||||
query['must'].append(
|
|
||||||
{"simple_query_string":
|
|
||||||
{"query": kwargs["text"], "fields": [
|
|
||||||
"claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5"
|
|
||||||
]}})
|
|
||||||
query = {
|
|
||||||
"_source": {"excludes": ["description", "title"]},
|
|
||||||
'query': {'bool': query},
|
|
||||||
"sort": [],
|
|
||||||
}
|
|
||||||
if "limit" in kwargs:
|
|
||||||
query["size"] = kwargs["limit"]
|
|
||||||
if 'offset' in kwargs:
|
|
||||||
query["from"] = kwargs["offset"]
|
|
||||||
if 'order_by' in kwargs:
|
|
||||||
if isinstance(kwargs["order_by"], str):
|
|
||||||
kwargs["order_by"] = [kwargs["order_by"]]
|
|
||||||
for value in kwargs['order_by']:
|
|
||||||
if 'trending_group' in value:
|
|
||||||
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
|
|
||||||
continue
|
|
||||||
is_asc = value.startswith('^')
|
|
||||||
value = value[1:] if is_asc else value
|
|
||||||
value = REPLACEMENTS.get(value, value)
|
|
||||||
if value in TEXT_FIELDS:
|
|
||||||
value += '.keyword'
|
|
||||||
query['sort'].append({value: "asc" if is_asc else "desc"})
|
|
||||||
if collapse:
|
|
||||||
query["collapse"] = {
|
|
||||||
"field": collapse[0],
|
|
||||||
"inner_hits": {
|
|
||||||
"name": collapse[0],
|
|
||||||
"size": collapse[1],
|
|
||||||
"sort": query["sort"]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return query
|
|
||||||
|
|
||||||
|
|
||||||
def expand_result(results):
|
|
||||||
inner_hits = []
|
|
||||||
expanded = []
|
|
||||||
for result in results:
|
|
||||||
if result.get("inner_hits"):
|
|
||||||
for _, inner_hit in result["inner_hits"].items():
|
|
||||||
inner_hits.extend(inner_hit["hits"]["hits"])
|
|
||||||
continue
|
|
||||||
result = result['_source']
|
|
||||||
result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1]
|
|
||||||
if result['reposted_claim_id']:
|
|
||||||
result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1]
|
|
||||||
else:
|
|
||||||
result['reposted_claim_hash'] = None
|
|
||||||
result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None
|
|
||||||
result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack('<I', result['tx_nout'])
|
|
||||||
result['tx_hash'] = bytes.fromhex(result['tx_id'])[::-1]
|
|
||||||
result['reposted'] = result.pop('repost_count')
|
|
||||||
result['signature_valid'] = result.pop('is_signature_valid')
|
|
||||||
# result['normalized'] = result.pop('normalized_name')
|
|
||||||
# if result['censoring_channel_hash']:
|
|
||||||
# result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1]
|
|
||||||
expanded.append(result)
|
|
||||||
if inner_hits:
|
|
||||||
return expand_result(inner_hits)
|
|
||||||
return expanded
|
|
||||||
|
|
||||||
|
|
||||||
class ResultCacheItem:
|
class ResultCacheItem:
|
||||||
__slots__ = '_result', 'lock', 'has_result'
|
__slots__ = '_result', 'lock', 'has_result'
|
||||||
|
|
|
@ -20,7 +20,7 @@ from hub.error import ResolveCensoredError, TooManyClaimSearchParametersError
|
||||||
from hub import __version__, PROMETHEUS_NAMESPACE
|
from hub import __version__, PROMETHEUS_NAMESPACE
|
||||||
from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION
|
from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION
|
||||||
from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
|
from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
|
||||||
from hub.elastic_sync.search import SearchIndex
|
from hub.herald.search import SearchIndex
|
||||||
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time
|
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time
|
||||||
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS
|
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS
|
||||||
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
|
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
|
||||||
|
|
Loading…
Reference in a new issue