Report API-level search timeouts in interrupt_count_metric.

This commit is contained in:
Jonathan Moody 2022-08-18 11:10:10 -04:00
parent a3b77951f0
commit 28c711efad
2 changed files with 19 additions and 10 deletions

View file

@ -146,11 +146,11 @@ class SearchIndex:
total_referenced = [] total_referenced = []
cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache) cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache)
if cache_item.result is not None: if cache_item.result is not None:
return cache_item.result return cache_item.result, False
async with cache_item.lock: async with cache_item.lock:
if cache_item.result: if cache_item.result:
return cache_item.result return cache_item.result, False
response, offset, total = await self.search(**kwargs) response, offset, total, timed_out = await self.search(**kwargs)
censored = {} censored = {}
for row in response: for row in response:
if (row.get('censor_type') or 0) >= Censor.SEARCH: if (row.get('censor_type') or 0) >= Censor.SEARCH:
@ -159,7 +159,7 @@ class SearchIndex:
censored[censoring_channel_hash].add(row['tx_hash']) censored[censoring_channel_hash].add(row['tx_hash'])
total_referenced.extend(response) total_referenced.extend(response)
if censored: if censored:
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) response, _, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
total_referenced.extend(response) total_referenced.extend(response)
response = [self._make_resolve_result(r) for r in response] response = [self._make_resolve_result(r) for r in response]
extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)] extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)]
@ -167,7 +167,7 @@ class SearchIndex:
response, extra, offset, total, censored response, extra, offset, total, censored
) )
cache_item.result = result cache_item.result = result
return result return result, timed_out
async def get_many(self, *claim_ids): async def get_many(self, *claim_ids):
await self.populate_claim_cache(*claim_ids) await self.populate_claim_cache(*claim_ids)
@ -186,7 +186,7 @@ class SearchIndex:
try: try:
return await self.search_ahead(**kwargs) return await self.search_ahead(**kwargs)
except NotFoundError: except NotFoundError:
return [], 0, 0 return [], 0, 0, False
# return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0) # return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0)
async def search_ahead(self, **kwargs): async def search_ahead(self, **kwargs):
@ -196,20 +196,25 @@ class SearchIndex:
page_size = kwargs.pop('limit', 10) page_size = kwargs.pop('limit', 10)
offset = kwargs.pop('offset', 0) offset = kwargs.pop('offset', 0)
kwargs['limit'] = 1000 kwargs['limit'] = 1000
timed_out = None
cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache) cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache)
if cache_item.result is not None: if cache_item.result is not None:
reordered_hits = cache_item.result reordered_hits = cache_item.result
timed_out = False
else: else:
async with cache_item.lock: async with cache_item.lock:
if cache_item.result: if cache_item.result:
reordered_hits = cache_item.result reordered_hits = cache_item.result
timed_out = False
else: else:
query = expand_query(**kwargs) query = expand_query(**kwargs)
search_hits = deque((await self.search_client.search( es_resp = await self.search_client.search(
query, index=self.index, track_total_hits=False, query, index=self.index, track_total_hits=False,
timeout=f'{int(1000*self.search_timeout)}ms', timeout=f'{int(1000*self.search_timeout)}ms',
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
))['hits']['hits']) )
search_hits = deque(es_resp['hits']['hits'])
timed_out = es_resp['timed_out']
if remove_duplicates: if remove_duplicates:
search_hits = self.__remove_duplicates(search_hits) search_hits = self.__remove_duplicates(search_hits)
if per_channel_per_page > 0: if per_channel_per_page > 0:
@ -218,7 +223,7 @@ class SearchIndex:
reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits] reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits]
cache_item.result = reordered_hits cache_item.result = reordered_hits
result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)]))) result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)])))
return result, 0, len(reordered_hits) return result, 0, len(reordered_hits), timed_out
def __remove_duplicates(self, search_hits: deque) -> deque: def __remove_duplicates(self, search_hits: deque) -> deque:
known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original

View file

@ -1261,7 +1261,11 @@ class LBRYElectrumX(asyncio.Protocol):
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
return Outputs.to_base64([], []) return Outputs.to_base64([], [])
kwargs['channel_id'] = channel_claim.claim_hash.hex() kwargs['channel_id'] = channel_claim.claim_hash.hex()
return await self.session_manager.search_index.cached_search(kwargs)
result, timed_out = await self.session_manager.search_index.cached_search(kwargs)
if timed_out:
self.session_manager.interrupt_count_metric.inc()
return result
except ConnectionTimeout: except ConnectionTimeout:
self.session_manager.interrupt_count_metric.inc() self.session_manager.interrupt_count_metric.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')