diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 8c17fed5e..e1f20dce7 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -9,7 +9,7 @@ from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch.helpers import async_streaming_bulk from lbry.crypto.base58 import Base58 -from lbry.error import ResolveCensoredError +from lbry.error import ResolveCensoredError, claim_id from lbry.schema.result import Outputs, Censor from lbry.schema.tags import clean_tags from lbry.schema.url import URL, normalize_name @@ -159,12 +159,17 @@ class SearchIndex: ] return results, censored, censor + async def get_many(self, *claim_ids): + results = await self.client.mget(index=self.index, body={"ids": claim_ids}) + results = filter(lambda doc: doc['found'], results["docs"]) + return expand_result(results) + async def search(self, **kwargs): if 'channel' in kwargs: result = await self.resolve_url(kwargs.pop('channel')) if not result or not isinstance(result, Iterable): return [], 0, 0 - kwargs['channel_id'] = result['_id'] + kwargs['channel_id'] = result['claim_id'] try: result = await self.client.search(expand_query(**kwargs), index=self.index) except NotFoundError: @@ -214,18 +219,18 @@ class SearchIndex: async def _get_referenced_rows(self, txo_rows: List[dict]): txo_rows = [row for row in txo_rows if isinstance(row, dict)] - repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows))) - channel_hashes = set(filter(None, (row['channel_hash'] for row in txo_rows))) - channel_hashes |= set(filter(None, (row['censoring_channel_hash'] for row in txo_rows))) + repost_hashes = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows))) + channel_hashes = set(filter(None, (row['channel_id'] for row in txo_rows))) + channel_hashes |= set(map(claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows)))) reposted_txos = [] if repost_hashes: - reposted_txos, _, _ = await self.search(limit=100, **{'claim_hash__in': list(repost_hashes)}) + reposted_txos = await self.get_many(*repost_hashes) channel_hashes |= set(filter(None, (row['channel_hash'] for row in reposted_txos))) channel_txos = [] if channel_hashes: - channel_txos, _, _ = await self.search(limit=100, **{'claim_hash__in': list(channel_hashes)}) + channel_txos = await self.get_many(*channel_hashes) # channels must come first for client side inflation to work properly return channel_txos + reposted_txos @@ -393,6 +398,9 @@ def expand_query(**kwargs): if isinstance(kwargs["order_by"], str): kwargs["order_by"] = [kwargs["order_by"]] for value in kwargs['order_by']: + if 'trending_mixed' in value: + # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. + continue is_asc = value.startswith('^') value = value[1:] if is_asc else value value = REPLACEMENTS.get(value, value) @@ -413,12 +421,13 @@ def expand_query(**kwargs): def expand_result(results): inner_hits = [] + expanded = [] for result in results: if result.get("inner_hits"): for _, inner_hit in result["inner_hits"].items(): inner_hits.extend(inner_hit["hits"]["hits"]) continue - result.update(result.pop('_source')) + result = result['_source'] result['claim_hash'] = unhexlify(result['claim_id'])[::-1] if result['reposted_claim_id']: result['reposted_claim_hash'] = unhexlify(result['reposted_claim_id'])[::-1] @@ -429,6 +438,7 @@ def expand_result(results): result['tx_hash'] = unhexlify(result['tx_id'])[::-1] if result['censoring_channel_hash']: result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1] + expanded.append(result) if inner_hits: return expand_result(inner_hits) - return results + return expanded diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 684558e5c..fc63f8a1f 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1006,12 +1006,8 @@ class LBRYElectrumX(SessionBase): self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) async def run_and_cache_query(self, query_name, kwargs): - if isinstance(kwargs, dict) and 'trending_mixed' in kwargs.get('order_by', {}): - # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. - # also, release_time isnt releavant when sorting by trending but it makes cache bad - if 'release_time' in kwargs: - kwargs.pop('release_time') - kwargs['order_by'] = ['trending_mixed'] + if isinstance(kwargs, dict): + kwargs['release_time'] = format_release_time(kwargs.get('release_time')) metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.start() cache = self.session_mgr.search_cache[query_name] @@ -1617,3 +1613,16 @@ def get_from_possible_keys(dictionary, *keys): for key in keys: if key in dictionary: return dictionary[key] + + +def format_release_time(release_time): + # round release time to 1000 so it caches better + # also set a default so we dont show claims in the future + def roundup_time(number, factor=360): + return int(1 + int(number / factor)) * factor + if isinstance(release_time, str) and len(release_time) > 0: + time_digits = ''.join(filter(str.isdigit, release_time)) + time_prefix = release_time[:-len(time_digits)] + return time_prefix + str(roundup_time(int(time_digits))) + elif isinstance(release_time, int): + return roundup_time(release_time)