From 02eb789f84066b97dc75003d02d6cf68bf89262b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 2 Mar 2021 19:58:54 -0300 Subject: [PATCH] caching for resolve --- lbry/wallet/server/db/elastic_search.py | 114 +++++++++++++++++------- 1 file changed, 81 insertions(+), 33 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index e1f20dce7..803298d5d 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -13,6 +13,7 @@ from lbry.error import ResolveCensoredError, claim_id from lbry.schema.result import Outputs, Censor from lbry.schema.tags import clean_tags from lbry.schema.url import URL, normalize_name +from lbry.utils import LRUCache from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES from lbry.wallet.server.util import class_logger @@ -23,6 +24,8 @@ class SearchIndex: self.index = index_prefix + 'claims' self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.logger = class_logger(__name__, self.__class__.__name__) + self.search_cache = LRUCache(2 ** 16) + self.channel_cache = LRUCache(2 ** 16) async def start(self): if self.client: @@ -97,6 +100,8 @@ class SearchIndex: await self.client.indices.refresh(self.index) await self.client.indices.flush(self.index) self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize()) + self.search_cache.clear() + self.channel_cache.clear() async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): def make_query(censor_type, blockdict, channels=False): @@ -151,7 +156,7 @@ class SearchIndex: async def resolve(self, *urls): censor = Censor(Censor.RESOLVE) - results = await asyncio.gather(*(self.resolve_url(url) for url in urls)) + results = [await self.resolve_url(url) for url in urls] censored = [ result if not isinstance(result, dict) or not censor.censor(result) else ResolveCensoredError(url, result['censoring_channel_hash']) @@ -160,9 +165,15 @@ class SearchIndex: return results, censored, censor async def get_many(self, *claim_ids): - results = await self.client.mget(index=self.index, body={"ids": claim_ids}) - results = filter(lambda doc: doc['found'], results["docs"]) - return expand_result(results) + cached = {claim_id: self.search_cache.get(claim_id) for claim_id in claim_ids if claim_id in self.search_cache} + missing = {claim_id for claim_id in claim_ids if claim_id not in cached} + if missing: + results = await self.client.mget(index=self.index, body={"ids": claim_ids}, + _source_excludes=['description', 'title']) + results = expand_result(filter(lambda doc: doc['found'], results["docs"])) + for result in results: + self.search_cache.set(result['claim_id'], result) + return list(filter(None, map(self.search_cache.get, claim_ids))) async def search(self, **kwargs): if 'channel' in kwargs: @@ -183,39 +194,76 @@ class SearchIndex: except ValueError as e: return e - channel = None - - if url.has_channel: - query = url.channel.to_dict() - if set(query) == {'name'}: - query['is_controlling'] = True - else: - query['order_by'] = ['^creation_height'] - matches, _, _ = await self.search(**query, limit=1) - if matches: - channel = matches[0] - else: - return LookupError(f'Could not find channel in "{raw_url}".') + stream = LookupError(f'Could not find claim at "{raw_url}".') + channel_id = await self.resolve_channel_id(url) + if isinstance(channel_id, LookupError): + return channel_id + stream = (await self.resolve_stream(url, channel_id if isinstance(channel_id, str) else None)) or stream if url.has_stream: - query = url.stream.to_dict() - if channel is not None: - if set(query) == {'name'}: - # temporarily emulate is_controlling for claims in channel - query['order_by'] = ['effective_amount', '^height'] - else: - query['order_by'] = ['^channel_join'] - query['channel_id'] = channel['claim_id'] - query['signature_valid'] = True - elif set(query) == {'name'}: - query['is_controlling'] = True + result = stream + else: + if isinstance(channel_id, str): + result = (await self.get_many(channel_id)) + result = result[0] if len(result) else LookupError(f'Could not find channel in "{url}".') + else: + result = channel_id + + return result + + async def resolve_channel_id(self, url: URL): + if not url.has_channel: + return + key = 'cid:' + str(url.channel) + if key in self.channel_cache: + return self.channel_cache[key] + query = url.channel.to_dict() + if set(query) == {'name'}: + query['is_controlling'] = True + else: + query['order_by'] = ['^creation_height'] + if len(query.get('claim_id', '')) != 40: matches, _, _ = await self.search(**query, limit=1) if matches: - return matches[0] + channel_id = matches[0]['claim_id'] else: - return LookupError(f'Could not find claim at "{raw_url}".') + return LookupError(f'Could not find channel in "{url}".') + else: + channel_id = query['claim_id'] + self.channel_cache.set(key, channel_id) + return channel_id - return channel + async def resolve_stream(self, url: URL, channel_id: str = None): + if not url.has_stream: + return None + if url.has_channel and channel_id is None: + return None + query = url.stream.to_dict() + stream = None + if 'claim_id' in query and len(query['claim_id']) == 40: + stream = (await self.get_many(query['claim_id'])) + stream = stream[0] if len(stream) else None + else: + key = (channel_id or '') + str(url.stream) + if key in self.search_cache: + return self.search_cache[key] + if channel_id is not None: + if set(query) == {'name'}: + # temporarily emulate is_controlling for claims in channel + query['order_by'] = ['effective_amount', '^height'] + else: + query['order_by'] = ['^channel_join'] + query['channel_id'] = channel_id + query['signature_valid'] = True + elif set(query) == {'name'}: + query['is_controlling'] = True + if not stream: + matches, _, _ = await self.search(**query, limit=1) + if matches: + stream = matches[0] + key = (channel_id or '') + str(url.stream) + self.search_cache.set(key, stream) + return stream async def _get_referenced_rows(self, txo_rows: List[dict]): txo_rows = [row for row in txo_rows if isinstance(row, dict)] @@ -226,7 +274,7 @@ class SearchIndex: reposted_txos = [] if repost_hashes: reposted_txos = await self.get_many(*repost_hashes) - channel_hashes |= set(filter(None, (row['channel_hash'] for row in reposted_txos))) + channel_hashes |= set(filter(None, (row['channel_id'] for row in reposted_txos))) channel_txos = [] if channel_hashes: @@ -398,7 +446,7 @@ def expand_query(**kwargs): if isinstance(kwargs["order_by"], str): kwargs["order_by"] = [kwargs["order_by"]] for value in kwargs['order_by']: - if 'trending_mixed' in value: + if 'trending_group' in value: # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. continue is_asc = value.startswith('^')