diff --git a/hub/herald/search.py b/hub/herald/search.py index 0435bed..7a0e10b 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -10,6 +10,7 @@ from hub.schema.result import Censor, Outputs from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result from hub.db.common import ResolveResult if TYPE_CHECKING: + from prometheus_client import Counter as PrometheusCounter from hub.db import SecondaryDB @@ -29,9 +30,10 @@ class SearchIndex: VERSION = 1 def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', - elastic_port=9200): + elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None): self.hub_db = hub_db self.search_timeout = search_timeout + self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None self.sync_client: Optional[AsyncElasticsearch] = None @@ -205,11 +207,14 @@ class SearchIndex: reordered_hits = cache_item.result else: query = expand_query(**kwargs) - search_hits = deque((await self.search_client.search( + es_resp = await self.search_client.search( query, index=self.index, track_total_hits=False, timeout=f'{int(1000*self.search_timeout)}ms', _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] - ))['hits']['hits']) + ) + search_hits = deque(es_resp['hits']['hits']) + if self.timeout_counter and es_resp['timed_out']: + self.timeout_counter.inc() if remove_duplicates: search_hits = self.__remove_duplicates(search_hits) if per_channel_per_page > 0: @@ -236,7 +241,7 @@ class SearchIndex: dropped.add(hit['_id']) return deque(hit for hit in search_hits if hit['_id'] not in dropped) - def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int): + def __search_ahead(self, search_hits: deque, page_size: int, per_channel_per_page: int) -> list: reordered_hits = [] channel_counters = Counter() next_page_hits_maybe_check_later = deque() diff --git a/hub/herald/session.py b/hub/herald/session.py index 8400714..73a6dbd 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -210,7 +210,8 @@ class SessionManager: # Search index self.search_index = SearchIndex( self.db, self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port + elastic_host=env.elastic_host, elastic_port=env.elastic_port, + timeout_counter=self.interrupt_count_metric ) self.running = False # hashX: List[int]