From 73884b34bccc286c81d72a7d791eaef2d74a66f2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 14 Mar 2021 04:56:53 -0300 Subject: [PATCH] apply no_totals --- lbry/wallet/server/db/elastic_search.py | 54 +++++++++---------- .../blockchain/test_claim_commands.py | 8 +++ 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 9c1d7643a..519eb78dd 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -34,9 +34,9 @@ class SearchIndex: self.client: Optional[AsyncElasticsearch] = None self.index = index_prefix + 'claims' self.logger = class_logger(__name__, self.__class__.__name__) - self.claim_cache = LRUCache(2 ** 15) # invalidated on touched + self.claim_cache = LRUCache(2 ** 15) self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever - self.search_cache = LRUCache(2 ** 17) # fixme: dont let session manager replace it + self.search_cache = LRUCache(2 ** 17) self.resolution_cache = LRUCache(2 ** 17) async def start(self): @@ -157,10 +157,6 @@ class SearchIndex: self.claim_cache.clear() self.resolution_cache.clear() - async def delete_above_height(self, height): - await self.client.delete_by_query(self.index, expand_query(height='>'+str(height))) - await self.client.indices.refresh(self.index) - async def session_query(self, query_name, kwargs): offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0 total_referenced = [] @@ -174,7 +170,10 @@ class SearchIndex: if cache_item.result: return cache_item.result censor = Censor(Censor.SEARCH) - response, offset, total = await self.search(**kwargs) + if kwargs.get('no_totals'): + response, offset, total = await self.search(**kwargs, censor_type=0) + else: + response, offset, total = await self.search(**kwargs) censor.apply(response) total_referenced.extend(response) if censor.censored: @@ -190,12 +189,13 @@ class SearchIndex: async def resolve(self, *urls): censor = Censor(Censor.RESOLVE) results = [await self.resolve_url(url) for url in urls] - missing = await self.get_many(*filter(lambda x: isinstance(x, str), results)) + # just heat the cache + await self.get_many(*filter(lambda x: isinstance(x, str), results)) for index in range(len(results)): result = results[index] url = urls[index] - if missing.get(result): - results[index] = missing[result] + if result in self.claim_cache: + results[index] = self.claim_cache[result] elif isinstance(result, StreamResolution): results[index] = LookupError(f'Could not find claim at "{url}".') elif isinstance(result, ChannelResolution): @@ -212,12 +212,11 @@ class SearchIndex: missing = [claim_id for claim_id in claim_ids if claim_id not in self.claim_cache] if missing: results = await self.search_client.mget( - index=self.index, body={"ids": missing}, _source_excludes=['description', 'title'] + index=self.index, body={"ids": missing} ) - results = expand_result(filter(lambda doc: doc['found'], results["docs"])) - for result in results: + for result in expand_result(filter(lambda doc: doc['found'], results["docs"])): self.claim_cache.set(result['claim_id'], result) - return {claim_id: self.claim_cache[claim_id] for claim_id in claim_ids if claim_id in self.claim_cache} + return filter(None, map(self.claim_cache.get, claim_ids)) async def full_id_from_short_id(self, name, short_id, channel_id=None): key = (channel_id or '') + name + short_id @@ -244,12 +243,13 @@ class SearchIndex: kwargs['channel_id'] = channel_id try: result = await self.search_client.search( - expand_query(**kwargs), index=self.index, track_total_hits=200 + expand_query(**kwargs), index=self.index, track_total_hits=False if kwargs.get('no_totals') else 200 ) + result = result['hits'] except NotFoundError: # index has no docs, fixme: log something return [], 0, 0 - return expand_result(result['hits']['hits']), 0, result['hits']['total']['value'] + return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0) async def resolve_url(self, raw_url): if raw_url not in self.resolution_cache: @@ -325,21 +325,19 @@ class SearchIndex: async def _get_referenced_rows(self, txo_rows: List[dict]): txo_rows = [row for row in txo_rows if isinstance(row, dict)] - repost_hashes = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows))) - channel_hashes = set(filter(None, (row['channel_id'] for row in txo_rows))) - channel_hashes |= set(map(parse_claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows)))) + referenced_ids = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows))) + referenced_ids |= set(filter(None, (row['channel_id'] for row in txo_rows))) + referenced_ids |= set(map(parse_claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows)))) - reposted_txos = [] - if repost_hashes: - reposted_txos = list((await self.get_many(*repost_hashes)).values()) - channel_hashes |= set(filter(None, (row['channel_id'] for row in reposted_txos))) + referenced_txos = [] + if referenced_ids: + referenced_txos.extend(await self.get_many(*referenced_ids)) + referenced_ids = set(filter(None, (row['channel_id'] for row in referenced_txos))) - channel_txos = [] - if channel_hashes: - channel_txos = list((await self.get_many(*channel_hashes)).values()) + if referenced_ids: + referenced_txos.extend(await self.get_many(*referenced_ids)) - # channels must come first for client side inflation to work properly - return channel_txos + reposted_txos + return referenced_txos def extract_doc(doc, index): diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 02ca8a02a..5fb8bacc8 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -1376,6 +1376,14 @@ class StreamCommands(ClaimTestCase): self.assertEqual(1, filtered['channels'][0]['blocked']) self.assertTrue(filtered['channels'][0]['channel']['short_url'].startswith('lbry://@filtering#')) + # same search, but details omitted by 'no_totals' + last_result = result + result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel', no_totals=True)) + filtered = result['blocked'] + self.assertEqual(0, filtered['total']) + self.assertEqual(0, len(filtered['channels'])) + self.assertEqual(result['items'], last_result['items']) + # content was filtered by not_tag before censoring result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel', not_tags=["good", "bad"])) self.assertEqual(0, len(result['items']))