From 8a1f0f7f3342f7f80d2583bcac97479c8bdc6c79 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Thu, 18 Aug 2022 11:10:10 -0400 Subject: [PATCH] Report API-level search timeouts in interrupt_count_metric. --- hub/herald/search.py | 23 ++++++++++++++--------- hub/herald/session.py | 6 +++++- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/hub/herald/search.py b/hub/herald/search.py index 0435bed..b728fc9 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -146,11 +146,11 @@ class SearchIndex: total_referenced = [] cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache) if cache_item.result is not None: - return cache_item.result + return cache_item.result, False async with cache_item.lock: if cache_item.result: - return cache_item.result - response, offset, total = await self.search(**kwargs) + return cache_item.result, False + response, offset, total, timed_out = await self.search(**kwargs) censored = {} for row in response: if (row.get('censor_type') or 0) >= Censor.SEARCH: @@ -159,7 +159,7 @@ class SearchIndex: censored[censoring_channel_hash].add(row['tx_hash']) total_referenced.extend(response) if censored: - response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) + response, _, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) total_referenced.extend(response) response = [self._make_resolve_result(r) for r in response] extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)] @@ -167,7 +167,7 @@ class SearchIndex: response, extra, offset, total, censored ) cache_item.result = result - return result + return result, timed_out async def get_many(self, *claim_ids): await self.populate_claim_cache(*claim_ids) @@ -186,7 +186,7 @@ class SearchIndex: try: return await self.search_ahead(**kwargs) except NotFoundError: - return [], 0, 0 + return [], 0, 0, False # return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0) async def search_ahead(self, **kwargs): @@ -196,20 +196,25 @@ class SearchIndex: page_size = kwargs.pop('limit', 10) offset = kwargs.pop('offset', 0) kwargs['limit'] = 1000 + timed_out = None cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache) if cache_item.result is not None: reordered_hits = cache_item.result + timed_out = False else: async with cache_item.lock: if cache_item.result: reordered_hits = cache_item.result + timed_out = False else: query = expand_query(**kwargs) - search_hits = deque((await self.search_client.search( + es_resp = await self.search_client.search( query, index=self.index, track_total_hits=False, timeout=f'{int(1000*self.search_timeout)}ms', _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] - ))['hits']['hits']) + ) + search_hits = deque(es_resp['hits']['hits']) + timed_out = es_resp['timed_out'] if remove_duplicates: search_hits = self.__remove_duplicates(search_hits) if per_channel_per_page > 0: @@ -218,7 +223,7 @@ class SearchIndex: reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits] cache_item.result = reordered_hits result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)]))) - return result, 0, len(reordered_hits) + return result, 0, len(reordered_hits), timed_out def __remove_duplicates(self, search_hits: deque) -> deque: known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original diff --git a/hub/herald/session.py b/hub/herald/session.py index 8400714..3c15d53 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -1261,7 +1261,11 @@ class LBRYElectrumX(asyncio.Protocol): if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): return Outputs.to_base64([], []) kwargs['channel_id'] = channel_claim.claim_hash.hex() - return await self.session_manager.search_index.cached_search(kwargs) + + result, timed_out = await self.session_manager.search_index.cached_search(kwargs) + if timed_out: + self.session_manager.interrupt_count_metric.inc() + return result except ConnectionTimeout: self.session_manager.interrupt_count_metric.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')