Report API-level search timeouts in interrupt_count_metric.
This commit is contained in:
parent
3cf620ed8e
commit
8a1f0f7f33
2 changed files with 19 additions and 10 deletions
|
@ -146,11 +146,11 @@ class SearchIndex:
|
||||||
total_referenced = []
|
total_referenced = []
|
||||||
cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache)
|
cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache)
|
||||||
if cache_item.result is not None:
|
if cache_item.result is not None:
|
||||||
return cache_item.result
|
return cache_item.result, False
|
||||||
async with cache_item.lock:
|
async with cache_item.lock:
|
||||||
if cache_item.result:
|
if cache_item.result:
|
||||||
return cache_item.result
|
return cache_item.result, False
|
||||||
response, offset, total = await self.search(**kwargs)
|
response, offset, total, timed_out = await self.search(**kwargs)
|
||||||
censored = {}
|
censored = {}
|
||||||
for row in response:
|
for row in response:
|
||||||
if (row.get('censor_type') or 0) >= Censor.SEARCH:
|
if (row.get('censor_type') or 0) >= Censor.SEARCH:
|
||||||
|
@ -159,7 +159,7 @@ class SearchIndex:
|
||||||
censored[censoring_channel_hash].add(row['tx_hash'])
|
censored[censoring_channel_hash].add(row['tx_hash'])
|
||||||
total_referenced.extend(response)
|
total_referenced.extend(response)
|
||||||
if censored:
|
if censored:
|
||||||
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
|
response, _, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
|
||||||
total_referenced.extend(response)
|
total_referenced.extend(response)
|
||||||
response = [self._make_resolve_result(r) for r in response]
|
response = [self._make_resolve_result(r) for r in response]
|
||||||
extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)]
|
extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)]
|
||||||
|
@ -167,7 +167,7 @@ class SearchIndex:
|
||||||
response, extra, offset, total, censored
|
response, extra, offset, total, censored
|
||||||
)
|
)
|
||||||
cache_item.result = result
|
cache_item.result = result
|
||||||
return result
|
return result, timed_out
|
||||||
|
|
||||||
async def get_many(self, *claim_ids):
|
async def get_many(self, *claim_ids):
|
||||||
await self.populate_claim_cache(*claim_ids)
|
await self.populate_claim_cache(*claim_ids)
|
||||||
|
@ -186,7 +186,7 @@ class SearchIndex:
|
||||||
try:
|
try:
|
||||||
return await self.search_ahead(**kwargs)
|
return await self.search_ahead(**kwargs)
|
||||||
except NotFoundError:
|
except NotFoundError:
|
||||||
return [], 0, 0
|
return [], 0, 0, False
|
||||||
# return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0)
|
# return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0)
|
||||||
|
|
||||||
async def search_ahead(self, **kwargs):
|
async def search_ahead(self, **kwargs):
|
||||||
|
@ -196,20 +196,25 @@ class SearchIndex:
|
||||||
page_size = kwargs.pop('limit', 10)
|
page_size = kwargs.pop('limit', 10)
|
||||||
offset = kwargs.pop('offset', 0)
|
offset = kwargs.pop('offset', 0)
|
||||||
kwargs['limit'] = 1000
|
kwargs['limit'] = 1000
|
||||||
|
timed_out = None
|
||||||
cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache)
|
cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache)
|
||||||
if cache_item.result is not None:
|
if cache_item.result is not None:
|
||||||
reordered_hits = cache_item.result
|
reordered_hits = cache_item.result
|
||||||
|
timed_out = False
|
||||||
else:
|
else:
|
||||||
async with cache_item.lock:
|
async with cache_item.lock:
|
||||||
if cache_item.result:
|
if cache_item.result:
|
||||||
reordered_hits = cache_item.result
|
reordered_hits = cache_item.result
|
||||||
|
timed_out = False
|
||||||
else:
|
else:
|
||||||
query = expand_query(**kwargs)
|
query = expand_query(**kwargs)
|
||||||
search_hits = deque((await self.search_client.search(
|
es_resp = await self.search_client.search(
|
||||||
query, index=self.index, track_total_hits=False,
|
query, index=self.index, track_total_hits=False,
|
||||||
timeout=f'{int(1000*self.search_timeout)}ms',
|
timeout=f'{int(1000*self.search_timeout)}ms',
|
||||||
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
|
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
|
||||||
))['hits']['hits'])
|
)
|
||||||
|
search_hits = deque(es_resp['hits']['hits'])
|
||||||
|
timed_out = es_resp['timed_out']
|
||||||
if remove_duplicates:
|
if remove_duplicates:
|
||||||
search_hits = self.__remove_duplicates(search_hits)
|
search_hits = self.__remove_duplicates(search_hits)
|
||||||
if per_channel_per_page > 0:
|
if per_channel_per_page > 0:
|
||||||
|
@ -218,7 +223,7 @@ class SearchIndex:
|
||||||
reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits]
|
reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits]
|
||||||
cache_item.result = reordered_hits
|
cache_item.result = reordered_hits
|
||||||
result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)])))
|
result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)])))
|
||||||
return result, 0, len(reordered_hits)
|
return result, 0, len(reordered_hits), timed_out
|
||||||
|
|
||||||
def __remove_duplicates(self, search_hits: deque) -> deque:
|
def __remove_duplicates(self, search_hits: deque) -> deque:
|
||||||
known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original
|
known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original
|
||||||
|
|
|
@ -1261,7 +1261,11 @@ class LBRYElectrumX(asyncio.Protocol):
|
||||||
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
|
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
|
||||||
return Outputs.to_base64([], [])
|
return Outputs.to_base64([], [])
|
||||||
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
||||||
return await self.session_manager.search_index.cached_search(kwargs)
|
|
||||||
|
result, timed_out = await self.session_manager.search_index.cached_search(kwargs)
|
||||||
|
if timed_out:
|
||||||
|
self.session_manager.interrupt_count_metric.inc()
|
||||||
|
return result
|
||||||
except ConnectionTimeout:
|
except ConnectionTimeout:
|
||||||
self.session_manager.interrupt_count_metric.inc()
|
self.session_manager.interrupt_count_metric.inc()
|
||||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
||||||
|
|
Loading…
Reference in a new issue