Updates for review comments. Implement timeout counter
bump in different way.
This commit is contained in:
parent
807e6151f2
commit
18e0571e81
2 changed files with 11 additions and 5 deletions
|
@ -10,6 +10,7 @@ from hub.schema.result import Censor, Outputs
|
||||||
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
|
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
|
||||||
from hub.db.common import ResolveResult
|
from hub.db.common import ResolveResult
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from prometheus_client import Counter as PrometheusCounter
|
||||||
from hub.db import SecondaryDB
|
from hub.db import SecondaryDB
|
||||||
|
|
||||||
|
|
||||||
|
@ -29,9 +30,10 @@ class SearchIndex:
|
||||||
VERSION = 1
|
VERSION = 1
|
||||||
|
|
||||||
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
||||||
elastic_port=9200):
|
elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None):
|
||||||
self.hub_db = hub_db
|
self.hub_db = hub_db
|
||||||
self.search_timeout = search_timeout
|
self.search_timeout = search_timeout
|
||||||
|
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
|
||||||
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
|
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
|
||||||
self.search_client: Optional[AsyncElasticsearch] = None
|
self.search_client: Optional[AsyncElasticsearch] = None
|
||||||
self.sync_client: Optional[AsyncElasticsearch] = None
|
self.sync_client: Optional[AsyncElasticsearch] = None
|
||||||
|
@ -205,11 +207,14 @@ class SearchIndex:
|
||||||
reordered_hits = cache_item.result
|
reordered_hits = cache_item.result
|
||||||
else:
|
else:
|
||||||
query = expand_query(**kwargs)
|
query = expand_query(**kwargs)
|
||||||
search_hits = deque((await self.search_client.search(
|
es_resp = await self.search_client.search(
|
||||||
query, index=self.index, track_total_hits=False,
|
query, index=self.index, track_total_hits=False,
|
||||||
timeout=f'{int(1000*self.search_timeout)}ms',
|
timeout=f'{int(1000*self.search_timeout)}ms',
|
||||||
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
|
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
|
||||||
))['hits']['hits'])
|
)
|
||||||
|
search_hits = deque(es_resp['hits']['hits'])
|
||||||
|
if self.timeout_counter and es_resp['timed_out']:
|
||||||
|
self.timeout_counter.inc()
|
||||||
if remove_duplicates:
|
if remove_duplicates:
|
||||||
search_hits = self.__remove_duplicates(search_hits)
|
search_hits = self.__remove_duplicates(search_hits)
|
||||||
if per_channel_per_page > 0:
|
if per_channel_per_page > 0:
|
||||||
|
@ -236,7 +241,7 @@ class SearchIndex:
|
||||||
dropped.add(hit['_id'])
|
dropped.add(hit['_id'])
|
||||||
return deque(hit for hit in search_hits if hit['_id'] not in dropped)
|
return deque(hit for hit in search_hits if hit['_id'] not in dropped)
|
||||||
|
|
||||||
def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int):
|
def __search_ahead(self, search_hits: deque, page_size: int, per_channel_per_page: int) -> list:
|
||||||
reordered_hits = []
|
reordered_hits = []
|
||||||
channel_counters = Counter()
|
channel_counters = Counter()
|
||||||
next_page_hits_maybe_check_later = deque()
|
next_page_hits_maybe_check_later = deque()
|
||||||
|
|
|
@ -210,7 +210,8 @@ class SessionManager:
|
||||||
# Search index
|
# Search index
|
||||||
self.search_index = SearchIndex(
|
self.search_index = SearchIndex(
|
||||||
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
||||||
elastic_host=env.elastic_host, elastic_port=env.elastic_port
|
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
|
||||||
|
timeout_counter=self.interrupt_count_metric
|
||||||
)
|
)
|
||||||
self.running = False
|
self.running = False
|
||||||
# hashX: List[int]
|
# hashX: List[int]
|
||||||
|
|
Loading…
Reference in a new issue