Revert "Report API-level search timeouts in interrupt_count_metric."
This reverts commit 28c711efad
.
This commit is contained in:
parent
8a1f0f7f33
commit
807e6151f2
2 changed files with 10 additions and 19 deletions
|
@ -146,11 +146,11 @@ class SearchIndex:
|
|||
total_referenced = []
|
||||
cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache)
|
||||
if cache_item.result is not None:
|
||||
return cache_item.result, False
|
||||
return cache_item.result
|
||||
async with cache_item.lock:
|
||||
if cache_item.result:
|
||||
return cache_item.result, False
|
||||
response, offset, total, timed_out = await self.search(**kwargs)
|
||||
return cache_item.result
|
||||
response, offset, total = await self.search(**kwargs)
|
||||
censored = {}
|
||||
for row in response:
|
||||
if (row.get('censor_type') or 0) >= Censor.SEARCH:
|
||||
|
@ -159,7 +159,7 @@ class SearchIndex:
|
|||
censored[censoring_channel_hash].add(row['tx_hash'])
|
||||
total_referenced.extend(response)
|
||||
if censored:
|
||||
response, _, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
|
||||
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
|
||||
total_referenced.extend(response)
|
||||
response = [self._make_resolve_result(r) for r in response]
|
||||
extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)]
|
||||
|
@ -167,7 +167,7 @@ class SearchIndex:
|
|||
response, extra, offset, total, censored
|
||||
)
|
||||
cache_item.result = result
|
||||
return result, timed_out
|
||||
return result
|
||||
|
||||
async def get_many(self, *claim_ids):
|
||||
await self.populate_claim_cache(*claim_ids)
|
||||
|
@ -186,7 +186,7 @@ class SearchIndex:
|
|||
try:
|
||||
return await self.search_ahead(**kwargs)
|
||||
except NotFoundError:
|
||||
return [], 0, 0, False
|
||||
return [], 0, 0
|
||||
# return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0)
|
||||
|
||||
async def search_ahead(self, **kwargs):
|
||||
|
@ -196,25 +196,20 @@ class SearchIndex:
|
|||
page_size = kwargs.pop('limit', 10)
|
||||
offset = kwargs.pop('offset', 0)
|
||||
kwargs['limit'] = 1000
|
||||
timed_out = None
|
||||
cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache)
|
||||
if cache_item.result is not None:
|
||||
reordered_hits = cache_item.result
|
||||
timed_out = False
|
||||
else:
|
||||
async with cache_item.lock:
|
||||
if cache_item.result:
|
||||
reordered_hits = cache_item.result
|
||||
timed_out = False
|
||||
else:
|
||||
query = expand_query(**kwargs)
|
||||
es_resp = await self.search_client.search(
|
||||
search_hits = deque((await self.search_client.search(
|
||||
query, index=self.index, track_total_hits=False,
|
||||
timeout=f'{int(1000*self.search_timeout)}ms',
|
||||
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
|
||||
)
|
||||
search_hits = deque(es_resp['hits']['hits'])
|
||||
timed_out = es_resp['timed_out']
|
||||
))['hits']['hits'])
|
||||
if remove_duplicates:
|
||||
search_hits = self.__remove_duplicates(search_hits)
|
||||
if per_channel_per_page > 0:
|
||||
|
@ -223,7 +218,7 @@ class SearchIndex:
|
|||
reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits]
|
||||
cache_item.result = reordered_hits
|
||||
result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)])))
|
||||
return result, 0, len(reordered_hits), timed_out
|
||||
return result, 0, len(reordered_hits)
|
||||
|
||||
def __remove_duplicates(self, search_hits: deque) -> deque:
|
||||
known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original
|
||||
|
|
|
@ -1261,11 +1261,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
|
||||
return Outputs.to_base64([], [])
|
||||
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
||||
|
||||
result, timed_out = await self.session_manager.search_index.cached_search(kwargs)
|
||||
if timed_out:
|
||||
self.session_manager.interrupt_count_metric.inc()
|
||||
return result
|
||||
return await self.session_manager.search_index.cached_search(kwargs)
|
||||
except ConnectionTimeout:
|
||||
self.session_manager.interrupt_count_metric.inc()
|
||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
||||
|
|
Loading…
Reference in a new issue