lbry-sdk/lbry/wallet/server/db/elasticsearch/search.py

646 lines
29 KiB
Python
Raw Normal View History

2021-01-17 09:50:49 +01:00
import asyncio
import struct
2021-03-24 09:35:31 +01:00
from binascii import unhexlify
2021-04-23 05:50:35 +02:00
from collections import Counter, deque
from decimal import Decimal
from operator import itemgetter
2021-03-24 09:35:31 +01:00
from typing import Optional, List, Iterable, Union
2021-01-17 09:50:49 +01:00
2021-02-12 01:45:41 +01:00
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
2021-02-23 00:47:56 +01:00
from elasticsearch.helpers import async_streaming_bulk
2021-01-17 09:50:49 +01:00
from lbry.crypto.base58 import Base58
from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
2021-01-20 05:20:50 +01:00
from lbry.schema.result import Outputs, Censor
from lbry.schema.tags import clean_tags
from lbry.schema.url import URL, normalize_name
2021-03-02 23:58:54 +01:00
from lbry.utils import LRUCache
from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES
from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \
2021-07-20 23:09:39 +02:00
RANGE_FIELDS, ALL_FIELDS
2021-02-12 01:45:41 +01:00
from lbry.wallet.server.util import class_logger
2021-01-17 09:50:49 +01:00
2021-03-11 05:41:55 +01:00
class ChannelResolution(str):
2021-03-24 09:35:31 +01:00
@classmethod
def lookup_error(cls, url):
return LookupError(f'Could not find channel in "{url}".')
2021-03-11 05:41:55 +01:00
class StreamResolution(str):
2021-03-24 09:35:31 +01:00
@classmethod
def lookup_error(cls, url):
return LookupError(f'Could not find claim at "{url}".')
2021-03-11 05:41:55 +01:00
class IndexVersionMismatch(Exception):
def __init__(self, got_version, expected_version):
self.got_version = got_version
self.expected_version = expected_version
class SearchIndex:
VERSION = 1
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
2021-03-09 04:19:58 +01:00
self.search_timeout = search_timeout
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.search_client: Optional[AsyncElasticsearch] = None
2021-03-24 09:35:31 +01:00
self.sync_client: Optional[AsyncElasticsearch] = None
self.index = index_prefix + 'claims'
2021-02-12 01:45:41 +01:00
self.logger = class_logger(__name__, self.__class__.__name__)
2021-03-14 08:56:53 +01:00
self.claim_cache = LRUCache(2 ** 15)
2021-04-16 04:32:06 +02:00
self.short_id_cache = LRUCache(2 ** 17)
2021-03-14 08:56:53 +01:00
self.search_cache = LRUCache(2 ** 17)
2021-03-11 06:09:55 +01:00
self.resolution_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
2021-01-17 09:50:49 +01:00
async def get_index_version(self) -> int:
try:
template = await self.sync_client.indices.get_template(self.index)
return template[self.index]['version']
except NotFoundError:
return 0
async def set_index_version(self, version):
await self.sync_client.indices.put_template(
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
)
2021-05-12 05:21:03 +02:00
async def start(self) -> bool:
2021-03-24 09:35:31 +01:00
if self.sync_client:
2021-05-12 05:21:03 +02:00
return False
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout)
2021-02-12 01:45:41 +01:00
while True:
try:
2021-03-24 09:35:31 +01:00
await self.sync_client.cluster.health(wait_for_status='yellow')
2021-02-12 01:45:41 +01:00
break
except ConnectionError:
self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!")
await asyncio.sleep(1)
2021-03-24 09:35:31 +01:00
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
acked = res.get('acknowledged', False)
if acked:
await self.set_index_version(self.VERSION)
return acked
index_version = await self.get_index_version()
if index_version != self.VERSION:
self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
2021-05-07 22:51:19 +02:00
raise IndexVersionMismatch(index_version, self.VERSION)
return acked
2021-01-17 09:50:49 +01:00
def stop(self):
2021-03-24 09:35:31 +01:00
clients = [self.sync_client, self.search_client]
self.sync_client, self.search_client = None, None
2021-03-09 04:19:58 +01:00
return asyncio.ensure_future(asyncio.gather(*(client.close() for client in clients)))
def delete_index(self):
2021-03-24 09:35:31 +01:00
return self.sync_client.indices.delete(self.index, ignore_unavailable=True)
2021-03-05 07:16:40 +01:00
async def _consume_claim_producer(self, claim_producer):
count = 0
for op, doc in claim_producer:
2021-02-22 20:42:43 +01:00
if op == 'delete':
yield {'_index': self.index, '_op_type': 'delete', '_id': doc}
else:
yield extract_doc(doc, self.index)
2021-03-05 07:16:40 +01:00
count += 1
2021-03-05 09:47:45 +01:00
if count % 100 == 0:
2021-03-05 07:16:40 +01:00
self.logger.info("Indexing in progress, %d claims.", count)
self.logger.info("Indexing done for %d claims.", count)
2021-02-22 20:42:43 +01:00
2021-03-05 07:16:40 +01:00
async def claim_consumer(self, claim_producer):
2021-03-05 08:32:48 +01:00
touched = set()
2021-03-24 09:35:31 +01:00
async for ok, item in async_streaming_bulk(self.sync_client, self._consume_claim_producer(claim_producer),
raise_on_error=False):
2021-02-23 00:47:56 +01:00
if not ok:
self.logger.warning("indexing failed for an item: %s", item)
2021-03-05 08:32:48 +01:00
else:
item = item.popitem()[1]
touched.add(item['_id'])
2021-03-24 09:35:31 +01:00
await self.sync_client.indices.refresh(self.index)
2021-03-05 07:16:40 +01:00
self.logger.info("Indexing done.")
2021-03-24 09:35:31 +01:00
def update_filter_query(self, censor_type, blockdict, channels=False):
blockdict = {key[::-1].hex(): value[::-1].hex() for key, value in blockdict.items()}
if channels:
update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
else:
update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
key = 'channel_id' if channels else 'claim_id'
update['script'] = {
2021-07-20 23:09:39 +02:00
"source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_id=params[ctx._source.{key}]",
2021-03-24 09:35:31 +01:00
"lang": "painless",
"params": blockdict
}
return update
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
if filtered_streams:
2021-03-24 09:35:31 +01:00
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.SEARCH, filtered_streams), slices=4)
await self.sync_client.indices.refresh(self.index)
if filtered_channels:
2021-03-24 09:35:31 +01:00
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels), slices=4)
await self.sync_client.indices.refresh(self.index)
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels, True), slices=4)
await self.sync_client.indices.refresh(self.index)
if blocked_streams:
2021-03-24 09:35:31 +01:00
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_streams), slices=4)
await self.sync_client.indices.refresh(self.index)
if blocked_channels:
2021-03-24 09:35:31 +01:00
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels), slices=4)
await self.sync_client.indices.refresh(self.index)
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4)
await self.sync_client.indices.refresh(self.index)
2021-05-12 09:40:43 +02:00
self.clear_caches()
2021-05-12 02:38:05 +02:00
def clear_caches(self):
2021-03-05 09:39:36 +01:00
self.search_cache.clear()
2021-04-16 04:32:06 +02:00
self.short_id_cache.clear()
2021-03-11 07:19:15 +01:00
self.claim_cache.clear()
2021-03-11 06:09:55 +01:00
self.resolution_cache.clear()
2021-02-03 17:29:41 +01:00
async def session_query(self, query_name, kwargs):
offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0
2021-01-30 03:38:15 +01:00
total_referenced = []
if query_name == 'resolve':
2021-01-30 03:38:15 +01:00
total_referenced, response, censor = await self.resolve(*kwargs)
2021-01-17 09:50:49 +01:00
else:
2021-03-12 19:44:30 +01:00
cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache)
if cache_item.result is not None:
return cache_item.result
2021-03-10 16:45:47 +01:00
async with cache_item.lock:
if cache_item.result:
return cache_item.result
censor = Censor(Censor.SEARCH)
2021-03-14 08:56:53 +01:00
if kwargs.get('no_totals'):
2021-03-24 09:35:31 +01:00
response, offset, total = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
2021-03-14 08:56:53 +01:00
else:
response, offset, total = await self.search(**kwargs)
2021-03-10 16:45:47 +01:00
censor.apply(response)
total_referenced.extend(response)
2021-03-10 16:45:47 +01:00
if censor.censored:
2021-03-24 09:35:31 +01:00
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
2021-03-10 16:45:47 +01:00
total_referenced.extend(response)
2021-03-12 19:44:30 +01:00
result = Outputs.to_base64(
response, await self._get_referenced_rows(total_referenced), offset, total, censor
)
cache_item.result = result
return result
return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor)
async def resolve(self, *urls):
2021-01-20 05:20:50 +01:00
censor = Censor(Censor.RESOLVE)
2021-03-02 23:58:54 +01:00
results = [await self.resolve_url(url) for url in urls]
2021-03-14 08:56:53 +01:00
# just heat the cache
2021-03-24 09:35:31 +01:00
await self.populate_claim_cache(*filter(lambda x: isinstance(x, str), results))
results = [self._get_from_cache_or_error(url, result) for url, result in zip(urls, results)]
2021-03-11 05:41:55 +01:00
2021-01-20 05:20:50 +01:00
censored = [
result if not isinstance(result, dict) or not censor.censor(result)
2021-07-20 23:09:39 +02:00
else ResolveCensoredError(url, result['censoring_channel_id'])
2021-01-20 05:20:50 +01:00
for url, result in zip(urls, results)
]
return results, censored, censor
2021-03-24 09:35:31 +01:00
def _get_from_cache_or_error(self, url: str, resolution: Union[LookupError, StreamResolution, ChannelResolution]):
cached = self.claim_cache.get(resolution)
return cached or (resolution if isinstance(resolution, LookupError) else resolution.lookup_error(url))
async def get_many(self, *claim_ids):
2021-03-24 09:35:31 +01:00
await self.populate_claim_cache(*claim_ids)
return filter(None, map(self.claim_cache.get, claim_ids))
async def populate_claim_cache(self, *claim_ids):
missing = [claim_id for claim_id in claim_ids if self.claim_cache.get(claim_id) is None]
2021-03-02 23:58:54 +01:00
if missing:
2021-03-09 04:19:58 +01:00
results = await self.search_client.mget(
2021-03-14 08:56:53 +01:00
index=self.index, body={"ids": missing}
2021-03-09 04:19:58 +01:00
)
2021-03-14 08:56:53 +01:00
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
2021-03-05 08:32:48 +01:00
self.claim_cache.set(result['claim_id'], result)
async def full_id_from_short_id(self, name, short_id, channel_id=None):
key = '#'.join((channel_id or '', name, short_id))
2021-03-05 08:32:48 +01:00
if key not in self.short_id_cache:
query = {'name': name, 'claim_id': short_id}
if channel_id:
query['channel_id'] = channel_id
query['order_by'] = ['^channel_join']
query['signature_valid'] = True
else:
query['order_by'] = '^creation_height'
result, _, _ = await self.search(**query, limit=1)
if len(result) == 1:
result = result[0]['claim_id']
self.short_id_cache[key] = result
return self.short_id_cache.get(key, None)
async def search(self, **kwargs):
if 'channel' in kwargs:
kwargs['channel_id'] = await self.resolve_url(kwargs.pop('channel'))
if not kwargs['channel_id'] or not isinstance(kwargs['channel_id'], str):
return [], 0, 0
try:
2021-05-17 19:54:39 +02:00
return await self.search_ahead(**kwargs)
except NotFoundError:
return [], 0, 0
2021-03-14 08:56:53 +01:00
return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0)
async def search_ahead(self, **kwargs):
# 'limit_claims_per_channel' case. Fetch 1000 results, reorder, slice, inflate and return
per_channel_per_page = kwargs.pop('limit_claims_per_channel', 0) or 0
remove_duplicates = kwargs.pop('remove_duplicates', False)
2021-04-28 18:28:38 +02:00
page_size = kwargs.pop('limit', 10)
offset = kwargs.pop('offset', 0)
kwargs['limit'] = 1000
2021-04-23 05:50:35 +02:00
cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache)
if cache_item.result is not None:
2021-04-28 18:28:38 +02:00
reordered_hits = cache_item.result
2021-04-23 05:50:35 +02:00
else:
async with cache_item.lock:
if cache_item.result:
2021-04-28 18:28:38 +02:00
reordered_hits = cache_item.result
2021-04-23 05:50:35 +02:00
else:
query = expand_query(**kwargs)
search_hits = deque((await self.search_client.search(
query, index=self.index, track_total_hits=False,
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
))['hits']['hits'])
if remove_duplicates:
search_hits = self.__remove_duplicates(search_hits)
if per_channel_per_page > 0:
reordered_hits = self.__search_ahead(search_hits, page_size, per_channel_per_page)
else:
reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits]
2021-04-28 18:28:38 +02:00
cache_item.result = reordered_hits
2021-05-17 19:54:39 +02:00
result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)])))
return result, 0, len(reordered_hits)
2021-04-23 05:50:35 +02:00
2021-05-20 06:20:25 +02:00
def __remove_duplicates(self, search_hits: deque) -> deque:
known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original
dropped = set()
for hit in search_hits:
hit_height, hit_id = hit['_source']['creation_height'], hit['_source']['reposted_claim_id'] or hit['_id']
if hit_id not in known_ids:
known_ids[hit_id] = (hit_height, hit['_id'])
else:
previous_height, previous_id = known_ids[hit_id]
if hit_height < previous_height:
known_ids[hit_id] = (hit_height, hit['_id'])
dropped.add(previous_id)
else:
dropped.add(hit['_id'])
2021-05-20 06:20:25 +02:00
return deque(hit for hit in search_hits if hit['_id'] not in dropped)
def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int):
2021-04-28 18:28:38 +02:00
reordered_hits = []
channel_counters = Counter()
next_page_hits_maybe_check_later = deque()
while search_hits or next_page_hits_maybe_check_later:
if reordered_hits and len(reordered_hits) % page_size == 0:
channel_counters.clear()
elif not reordered_hits:
pass
else:
break # means last page was incomplete and we are left with bad replacements
2021-04-28 18:28:38 +02:00
for _ in range(len(next_page_hits_maybe_check_later)):
claim_id, channel_id = next_page_hits_maybe_check_later.popleft()
if per_channel_per_page > 0 and channel_counters[channel_id] < per_channel_per_page:
2021-04-28 18:28:38 +02:00
reordered_hits.append((claim_id, channel_id))
channel_counters[channel_id] += 1
else:
2021-04-28 18:28:38 +02:00
next_page_hits_maybe_check_later.append((claim_id, channel_id))
while search_hits:
hit = search_hits.popleft()
hit_id, hit_channel_id = hit['_id'], hit['_source']['channel_id']
if hit_channel_id is None or per_channel_per_page <= 0:
2021-04-28 18:28:38 +02:00
reordered_hits.append((hit_id, hit_channel_id))
elif channel_counters[hit_channel_id] < per_channel_per_page:
reordered_hits.append((hit_id, hit_channel_id))
channel_counters[hit_channel_id] += 1
if len(reordered_hits) % page_size == 0:
break
else:
2021-04-28 18:28:38 +02:00
next_page_hits_maybe_check_later.append((hit_id, hit_channel_id))
return reordered_hits
async def resolve_url(self, raw_url):
2021-03-11 06:09:55 +01:00
if raw_url not in self.resolution_cache:
self.resolution_cache[raw_url] = await self._resolve_url(raw_url)
return self.resolution_cache[raw_url]
async def _resolve_url(self, raw_url):
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
2021-03-02 23:58:54 +01:00
stream = LookupError(f'Could not find claim at "{raw_url}".')
2021-03-02 23:58:54 +01:00
channel_id = await self.resolve_channel_id(url)
if isinstance(channel_id, LookupError):
return channel_id
stream = (await self.resolve_stream(url, channel_id if isinstance(channel_id, str) else None)) or stream
if url.has_stream:
2021-03-11 05:41:55 +01:00
return StreamResolution(stream)
2021-03-02 23:58:54 +01:00
else:
2021-03-11 05:41:55 +01:00
return ChannelResolution(channel_id)
2021-03-02 23:58:54 +01:00
async def resolve_channel_id(self, url: URL):
if not url.has_channel:
return
2021-03-05 08:32:48 +01:00
if url.channel.is_fullid:
return url.channel.claim_id
if url.channel.is_shortid:
channel_id = await self.full_id_from_short_id(url.channel.name, url.channel.claim_id)
if not channel_id:
return LookupError(f'Could not find channel in "{url}".')
return channel_id
2021-03-02 23:58:54 +01:00
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
2021-03-05 08:32:48 +01:00
matches, _, _ = await self.search(**query, limit=1)
if matches:
channel_id = matches[0]['claim_id']
2021-03-02 23:58:54 +01:00
else:
2021-03-05 08:32:48 +01:00
return LookupError(f'Could not find channel in "{url}".')
2021-03-02 23:58:54 +01:00
return channel_id
async def resolve_stream(self, url: URL, channel_id: str = None):
if not url.has_stream:
return None
if url.has_channel and channel_id is None:
return None
query = url.stream.to_dict()
2021-03-05 08:32:48 +01:00
if url.stream.claim_id is not None:
if url.stream.is_fullid:
claim_id = url.stream.claim_id
else:
claim_id = await self.full_id_from_short_id(query['name'], query['claim_id'], channel_id)
2021-03-11 05:41:55 +01:00
return claim_id
2021-03-05 08:32:48 +01:00
2021-03-02 23:58:54 +01:00
if channel_id is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_id'] = channel_id
query['signature_valid'] = True
elif set(query) == {'name'}:
query['is_controlling'] = True
2021-03-05 08:32:48 +01:00
matches, _, _ = await self.search(**query, limit=1)
if matches:
2021-03-11 05:41:55 +01:00
return matches[0]['claim_id']
async def _get_referenced_rows(self, txo_rows: List[dict]):
txo_rows = [row for row in txo_rows if isinstance(row, dict)]
2021-03-14 08:56:53 +01:00
referenced_ids = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows)))
referenced_ids |= set(filter(None, (row['channel_id'] for row in txo_rows)))
2021-07-20 23:25:37 +02:00
referenced_ids |= set(filter(None, (row['censoring_channel_id'] for row in txo_rows)))
2021-03-14 08:56:53 +01:00
referenced_txos = []
if referenced_ids:
referenced_txos.extend(await self.get_many(*referenced_ids))
referenced_ids = set(filter(None, (row['channel_id'] for row in referenced_txos)))
2021-03-14 08:56:53 +01:00
if referenced_ids:
referenced_txos.extend(await self.get_many(*referenced_ids))
2021-03-14 08:56:53 +01:00
return referenced_txos
2021-01-17 09:50:49 +01:00
def extract_doc(doc, index):
2021-03-24 09:35:31 +01:00
doc['claim_id'] = doc.pop('claim_hash')[::-1].hex()
2021-01-17 09:50:49 +01:00
if doc['reposted_claim_hash'] is not None:
2021-03-24 09:35:31 +01:00
doc['reposted_claim_id'] = doc.pop('reposted_claim_hash')[::-1].hex()
2021-01-17 09:50:49 +01:00
else:
doc['reposted_claim_id'] = None
2021-01-17 09:50:49 +01:00
channel_hash = doc.pop('channel_hash')
2021-03-24 09:35:31 +01:00
doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash
2021-07-20 23:09:39 +02:00
doc['censoring_channel_id'] = doc.get('censoring_channel_id')
2021-01-17 09:50:49 +01:00
txo_hash = doc.pop('txo_hash')
2021-03-24 09:35:31 +01:00
doc['tx_id'] = txo_hash[:32][::-1].hex()
2021-01-17 09:50:49 +01:00
doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0]
doc['repost_count'] = doc.pop('reposted')
2021-01-17 09:50:49 +01:00
doc['is_controlling'] = bool(doc['is_controlling'])
2021-03-24 09:35:31 +01:00
doc['signature'] = (doc.pop('signature') or b'').hex() or None
doc['signature_digest'] = (doc.pop('signature_digest') or b'').hex() or None
doc['public_key_bytes'] = (doc.pop('public_key_bytes') or b'').hex() or None
2021-07-20 23:09:39 +02:00
doc['public_key_id'] = (doc.pop('public_key_hash') or b'').hex() or None
doc['is_signature_valid'] = bool(doc['signature_valid'])
doc['claim_type'] = doc.get('claim_type', 0) or 0
doc['stream_type'] = int(doc.get('stream_type', 0) or 0)
2021-03-14 17:54:04 +01:00
doc['has_source'] = bool(doc['has_source'])
doc['normalized_name'] = doc.pop('normalized')
2021-07-20 23:09:39 +02:00
doc = {key: value for key, value in doc.items() if key in ALL_FIELDS}
return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', 'doc_as_upsert': True}
def expand_query(**kwargs):
2021-01-31 21:55:27 +01:00
if "amount_order" in kwargs:
kwargs["limit"] = 1
kwargs["order_by"] = "effective_amount"
kwargs["offset"] = int(kwargs["amount_order"]) - 1
if 'name' in kwargs:
kwargs['name'] = normalize_name(kwargs.pop('name'))
2021-02-14 00:06:00 +01:00
if kwargs.get('is_controlling') is False:
kwargs.pop('is_controlling')
query = {'must': [], 'must_not': []}
collapse = None
for key, value in kwargs.items():
key = key.replace('claim.', '')
2021-01-20 00:38:03 +01:00
many = key.endswith('__in') or isinstance(value, list)
if many and len(value) > 2048:
raise TooManyClaimSearchParametersError(key, 2048)
if many:
key = key.replace('__in', '')
2021-03-05 05:08:40 +01:00
value = list(filter(None, value))
if value is None or isinstance(value, list) and len(value) == 0:
continue
key = REPLACEMENTS.get(key, key)
if key in FIELDS:
2021-01-27 04:28:58 +01:00
partial_id = False
if key == 'claim_type':
if isinstance(value, str):
value = CLAIM_TYPES[value]
else:
value = [CLAIM_TYPES[claim_type] for claim_type in value]
if key == '_id':
if isinstance(value, Iterable):
2021-03-24 09:35:31 +01:00
value = [item[::-1].hex() for item in value]
else:
2021-03-24 09:35:31 +01:00
value = value[::-1].hex()
2021-01-27 04:28:58 +01:00
if not many and key in ('_id', 'claim_id') and len(value) < 20:
2021-01-27 02:26:45 +01:00
partial_id = True
if key == 'public_key_id':
2021-03-24 09:35:31 +01:00
value = Base58.decode(value)[1:21].hex()
2021-07-22 06:33:13 +02:00
if key in ('signature_valid', 'has_source'):
continue # handled later
if key in TEXT_FIELDS:
key += '.keyword'
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
2021-01-27 02:26:45 +01:00
if partial_id:
2021-01-27 05:10:28 +01:00
query['must'].append({"prefix": {"claim_id": value}})
2021-01-27 02:26:45 +01:00
elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
operator_length = 2 if value[:2] in ops else 1
operator, value = value[:operator_length], value[operator_length:]
if key == 'fee_amount':
2021-03-05 09:39:36 +01:00
value = str(Decimal(value)*1000)
query['must'].append({"range": {key: {ops[operator]: value}}})
elif many:
query['must'].append({"terms": {key: value}})
else:
if key == 'fee_amount':
2021-03-05 09:39:36 +01:00
value = str(Decimal(value)*1000)
query['must'].append({"term": {key: {"value": value}}})
elif key == 'not_channel_ids':
for channel_id in value:
query['must_not'].append({"term": {'channel_id.keyword': channel_id}})
query['must_not'].append({"term": {'_id': channel_id}})
elif key == 'channel_ids':
query['must'].append({"terms": {'channel_id.keyword': value}})
2021-02-10 01:38:41 +01:00
elif key == 'claim_ids':
query['must'].append({"terms": {'claim_id.keyword': value}})
elif key == 'media_types':
query['must'].append({"terms": {'media_type.keyword': value}})
elif key == 'stream_types':
query['must'].append({"terms": {'stream_type': [STREAM_TYPES[stype] for stype in value]}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': clean_tags(value)}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': value}})
elif key == 'all_languages':
query['must'].extend([{"term": {'languages': tag}} for tag in value])
elif key == 'any_tags':
2021-01-20 00:38:03 +01:00
query['must'].append({"terms": {'tags.keyword': clean_tags(value)}})
elif key == 'all_tags':
2021-01-20 00:38:03 +01:00
query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
elif key == 'not_tags':
2021-01-20 00:38:03 +01:00
query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
2021-01-31 06:43:45 +01:00
elif key == 'not_claim_id':
query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value])
elif key == 'limit_claims_per_channel':
collapse = ('channel_id.keyword', value)
if kwargs.get('has_channel_signature'):
query['must'].append({"exists": {"field": "signature_digest"}})
if 'signature_valid' in kwargs:
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
elif 'signature_valid' in kwargs:
query.setdefault('should', [])
query["minimum_should_match"] = 1
query['should'].append({"bool": {"must_not": {"exists": {"field": "signature_digest"}}}})
query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
2021-03-25 08:46:21 +01:00
if 'has_source' in kwargs:
query.setdefault('should', [])
query["minimum_should_match"] = 1
2021-03-26 04:27:05 +01:00
is_stream_or_repost = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}}
query['should'].append(
{"bool": {"must": [{"match": {"has_source": kwargs['has_source']}}, is_stream_or_repost]}})
query['should'].append({"bool": {"must_not": [is_stream_or_repost]}})
2021-04-16 09:35:12 +02:00
query['should'].append({"bool": {"must": [{"term": {"reposted_claim_type": CLAIM_TYPES['channel']}}]}})
2021-02-13 06:16:49 +01:00
if kwargs.get('text'):
query['must'].append(
{"simple_query_string":
{"query": kwargs["text"], "fields": [
"claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5"
2021-02-13 06:16:49 +01:00
]}})
query = {
2021-01-27 06:56:43 +01:00
"_source": {"excludes": ["description", "title"]},
'query': {'bool': query},
"sort": [],
}
if "limit" in kwargs:
query["size"] = kwargs["limit"]
if 'offset' in kwargs:
query["from"] = kwargs["offset"]
if 'order_by' in kwargs:
2021-01-31 21:55:27 +01:00
if isinstance(kwargs["order_by"], str):
kwargs["order_by"] = [kwargs["order_by"]]
for value in kwargs['order_by']:
2021-03-02 23:58:54 +01:00
if 'trending_group' in value:
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
continue
is_asc = value.startswith('^')
value = value[1:] if is_asc else value
value = REPLACEMENTS.get(value, value)
if value in TEXT_FIELDS:
value += '.keyword'
query['sort'].append({value: "asc" if is_asc else "desc"})
if collapse:
query["collapse"] = {
"field": collapse[0],
"inner_hits": {
"name": collapse[0],
"size": collapse[1],
"sort": query["sort"]
}
}
return query
def expand_result(results):
inner_hits = []
expanded = []
for result in results:
if result.get("inner_hits"):
for _, inner_hit in result["inner_hits"].items():
inner_hits.extend(inner_hit["hits"]["hits"])
continue
result = result['_source']
result['claim_hash'] = unhexlify(result['claim_id'])[::-1]
if result['reposted_claim_id']:
result['reposted_claim_hash'] = unhexlify(result['reposted_claim_id'])[::-1]
else:
result['reposted_claim_hash'] = None
result['channel_hash'] = unhexlify(result['channel_id'])[::-1] if result['channel_id'] else None
result['txo_hash'] = unhexlify(result['tx_id'])[::-1] + struct.pack('<I', result['tx_nout'])
2021-01-20 05:20:50 +01:00
result['tx_hash'] = unhexlify(result['tx_id'])[::-1]
result['reposted'] = result.pop('repost_count')
result['signature_valid'] = result.pop('is_signature_valid')
result['normalized'] = result.pop('normalized_name')
expanded.append(result)
if inner_hits:
return expand_result(inner_hits)
return expanded
2021-03-05 09:39:36 +01:00
class ResultCacheItem:
__slots__ = '_result', 'lock', 'has_result'
def __init__(self):
self.has_result = asyncio.Event()
self.lock = asyncio.Lock()
self._result = None
@property
def result(self) -> str:
return self._result
@result.setter
def result(self, result: str):
self._result = result
if result is not None:
self.has_result.set()
@classmethod
def from_cache(cls, cache_key, cache):
cache_item = cache.get(cache_key)
if cache_item is None:
cache_item = cache[cache_key] = ResultCacheItem()
return cache_item