caching for resolve

This commit is contained in:
Victor Shyba 2021-03-02 19:58:54 -03:00
parent 5a9338a27f
commit 02eb789f84

View file

@ -13,6 +13,7 @@ from lbry.error import ResolveCensoredError, claim_id
from lbry.schema.result import Outputs, Censor
from lbry.schema.tags import clean_tags
from lbry.schema.url import URL, normalize_name
from lbry.utils import LRUCache
from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES
from lbry.wallet.server.util import class_logger
@ -23,6 +24,8 @@ class SearchIndex:
self.index = index_prefix + 'claims'
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.logger = class_logger(__name__, self.__class__.__name__)
self.search_cache = LRUCache(2 ** 16)
self.channel_cache = LRUCache(2 ** 16)
async def start(self):
if self.client:
@ -97,6 +100,8 @@ class SearchIndex:
await self.client.indices.refresh(self.index)
await self.client.indices.flush(self.index)
self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize())
self.search_cache.clear()
self.channel_cache.clear()
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
def make_query(censor_type, blockdict, channels=False):
@ -151,7 +156,7 @@ class SearchIndex:
async def resolve(self, *urls):
censor = Censor(Censor.RESOLVE)
results = await asyncio.gather(*(self.resolve_url(url) for url in urls))
results = [await self.resolve_url(url) for url in urls]
censored = [
result if not isinstance(result, dict) or not censor.censor(result)
else ResolveCensoredError(url, result['censoring_channel_hash'])
@ -160,9 +165,15 @@ class SearchIndex:
return results, censored, censor
async def get_many(self, *claim_ids):
results = await self.client.mget(index=self.index, body={"ids": claim_ids})
results = filter(lambda doc: doc['found'], results["docs"])
return expand_result(results)
cached = {claim_id: self.search_cache.get(claim_id) for claim_id in claim_ids if claim_id in self.search_cache}
missing = {claim_id for claim_id in claim_ids if claim_id not in cached}
if missing:
results = await self.client.mget(index=self.index, body={"ids": claim_ids},
_source_excludes=['description', 'title'])
results = expand_result(filter(lambda doc: doc['found'], results["docs"]))
for result in results:
self.search_cache.set(result['claim_id'], result)
return list(filter(None, map(self.search_cache.get, claim_ids)))
async def search(self, **kwargs):
if 'channel' in kwargs:
@ -183,39 +194,76 @@ class SearchIndex:
except ValueError as e:
return e
channel = None
if url.has_channel:
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
matches, _, _ = await self.search(**query, limit=1)
if matches:
channel = matches[0]
else:
return LookupError(f'Could not find channel in "{raw_url}".')
stream = LookupError(f'Could not find claim at "{raw_url}".')
channel_id = await self.resolve_channel_id(url)
if isinstance(channel_id, LookupError):
return channel_id
stream = (await self.resolve_stream(url, channel_id if isinstance(channel_id, str) else None)) or stream
if url.has_stream:
query = url.stream.to_dict()
if channel is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_id'] = channel['claim_id']
query['signature_valid'] = True
elif set(query) == {'name'}:
query['is_controlling'] = True
result = stream
else:
if isinstance(channel_id, str):
result = (await self.get_many(channel_id))
result = result[0] if len(result) else LookupError(f'Could not find channel in "{url}".')
else:
result = channel_id
return result
async def resolve_channel_id(self, url: URL):
if not url.has_channel:
return
key = 'cid:' + str(url.channel)
if key in self.channel_cache:
return self.channel_cache[key]
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
if len(query.get('claim_id', '')) != 40:
matches, _, _ = await self.search(**query, limit=1)
if matches:
return matches[0]
channel_id = matches[0]['claim_id']
else:
return LookupError(f'Could not find claim at "{raw_url}".')
return LookupError(f'Could not find channel in "{url}".')
else:
channel_id = query['claim_id']
self.channel_cache.set(key, channel_id)
return channel_id
return channel
async def resolve_stream(self, url: URL, channel_id: str = None):
if not url.has_stream:
return None
if url.has_channel and channel_id is None:
return None
query = url.stream.to_dict()
stream = None
if 'claim_id' in query and len(query['claim_id']) == 40:
stream = (await self.get_many(query['claim_id']))
stream = stream[0] if len(stream) else None
else:
key = (channel_id or '') + str(url.stream)
if key in self.search_cache:
return self.search_cache[key]
if channel_id is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_id'] = channel_id
query['signature_valid'] = True
elif set(query) == {'name'}:
query['is_controlling'] = True
if not stream:
matches, _, _ = await self.search(**query, limit=1)
if matches:
stream = matches[0]
key = (channel_id or '') + str(url.stream)
self.search_cache.set(key, stream)
return stream
async def _get_referenced_rows(self, txo_rows: List[dict]):
txo_rows = [row for row in txo_rows if isinstance(row, dict)]
@ -226,7 +274,7 @@ class SearchIndex:
reposted_txos = []
if repost_hashes:
reposted_txos = await self.get_many(*repost_hashes)
channel_hashes |= set(filter(None, (row['channel_hash'] for row in reposted_txos)))
channel_hashes |= set(filter(None, (row['channel_id'] for row in reposted_txos)))
channel_txos = []
if channel_hashes:
@ -398,7 +446,7 @@ def expand_query(**kwargs):
if isinstance(kwargs["order_by"], str):
kwargs["order_by"] = [kwargs["order_by"]]
for value in kwargs['order_by']:
if 'trending_mixed' in value:
if 'trending_group' in value:
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
continue
is_asc = value.startswith('^')