round time to 10 minutes and fetch referenced by id

This commit is contained in:
Victor Shyba 2021-03-01 23:23:38 -03:00
parent 325419404d
commit eb6924277f
2 changed files with 34 additions and 15 deletions

View file

@ -9,7 +9,7 @@ from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from elasticsearch.helpers import async_streaming_bulk
from lbry.crypto.base58 import Base58
from lbry.error import ResolveCensoredError
from lbry.error import ResolveCensoredError, claim_id
from lbry.schema.result import Outputs, Censor
from lbry.schema.tags import clean_tags
from lbry.schema.url import URL, normalize_name
@ -159,12 +159,17 @@ class SearchIndex:
]
return results, censored, censor
async def get_many(self, *claim_ids):
results = await self.client.mget(index=self.index, body={"ids": claim_ids})
results = filter(lambda doc: doc['found'], results["docs"])
return expand_result(results)
async def search(self, **kwargs):
if 'channel' in kwargs:
result = await self.resolve_url(kwargs.pop('channel'))
if not result or not isinstance(result, Iterable):
return [], 0, 0
kwargs['channel_id'] = result['_id']
kwargs['channel_id'] = result['claim_id']
try:
result = await self.client.search(expand_query(**kwargs), index=self.index)
except NotFoundError:
@ -214,18 +219,18 @@ class SearchIndex:
async def _get_referenced_rows(self, txo_rows: List[dict]):
txo_rows = [row for row in txo_rows if isinstance(row, dict)]
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows)))
channel_hashes = set(filter(None, (row['channel_hash'] for row in txo_rows)))
channel_hashes |= set(filter(None, (row['censoring_channel_hash'] for row in txo_rows)))
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows)))
channel_hashes = set(filter(None, (row['channel_id'] for row in txo_rows)))
channel_hashes |= set(map(claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows))))
reposted_txos = []
if repost_hashes:
reposted_txos, _, _ = await self.search(limit=100, **{'claim_hash__in': list(repost_hashes)})
reposted_txos = await self.get_many(*repost_hashes)
channel_hashes |= set(filter(None, (row['channel_hash'] for row in reposted_txos)))
channel_txos = []
if channel_hashes:
channel_txos, _, _ = await self.search(limit=100, **{'claim_hash__in': list(channel_hashes)})
channel_txos = await self.get_many(*channel_hashes)
# channels must come first for client side inflation to work properly
return channel_txos + reposted_txos
@ -393,6 +398,9 @@ def expand_query(**kwargs):
if isinstance(kwargs["order_by"], str):
kwargs["order_by"] = [kwargs["order_by"]]
for value in kwargs['order_by']:
if 'trending_mixed' in value:
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
continue
is_asc = value.startswith('^')
value = value[1:] if is_asc else value
value = REPLACEMENTS.get(value, value)
@ -413,12 +421,13 @@ def expand_query(**kwargs):
def expand_result(results):
inner_hits = []
expanded = []
for result in results:
if result.get("inner_hits"):
for _, inner_hit in result["inner_hits"].items():
inner_hits.extend(inner_hit["hits"]["hits"])
continue
result.update(result.pop('_source'))
result = result['_source']
result['claim_hash'] = unhexlify(result['claim_id'])[::-1]
if result['reposted_claim_id']:
result['reposted_claim_hash'] = unhexlify(result['reposted_claim_id'])[::-1]
@ -429,6 +438,7 @@ def expand_result(results):
result['tx_hash'] = unhexlify(result['tx_id'])[::-1]
if result['censoring_channel_hash']:
result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1]
expanded.append(result)
if inner_hits:
return expand_result(inner_hits)
return results
return expanded

View file

@ -1006,12 +1006,8 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def run_and_cache_query(self, query_name, kwargs):
if isinstance(kwargs, dict) and 'trending_mixed' in kwargs.get('order_by', {}):
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
# also, release_time isnt releavant when sorting by trending but it makes cache bad
if 'release_time' in kwargs:
kwargs.pop('release_time')
kwargs['order_by'] = ['trending_mixed']
if isinstance(kwargs, dict):
kwargs['release_time'] = format_release_time(kwargs.get('release_time'))
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.start()
cache = self.session_mgr.search_cache[query_name]
@ -1617,3 +1613,16 @@ def get_from_possible_keys(dictionary, *keys):
for key in keys:
if key in dictionary:
return dictionary[key]
def format_release_time(release_time):
# round release time to 1000 so it caches better
# also set a default so we dont show claims in the future
def roundup_time(number, factor=360):
return int(1 + int(number / factor)) * factor
if isinstance(release_time, str) and len(release_time) > 0:
time_digits = ''.join(filter(str.isdigit, release_time))
time_prefix = release_time[:-len(time_digits)]
return time_prefix + str(roundup_time(int(time_digits)))
elif isinstance(release_time, int):
return roundup_time(release_time)