general timeout

This commit is contained in:
Victor Shyba 2021-01-31 02:27:28 -03:00
parent d467dcfeaf
commit 84ff0b8a9f

View file

@ -20,9 +20,10 @@ class SearchIndex:
def __init__(self, index_prefix: str): def __init__(self, index_prefix: str):
self.client: Optional[AsyncElasticsearch] = None self.client: Optional[AsyncElasticsearch] = None
self.index = index_prefix + 'claims' self.index = index_prefix + 'claims'
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
async def start(self): async def start(self):
self.client = AsyncElasticsearch() self.client = AsyncElasticsearch(timeout=self.sync_timeout)
try: try:
if await self.client.indices.exists(self.index): if await self.client.indices.exists(self.index):
return return
@ -93,19 +94,18 @@ class SearchIndex:
"params": blockdict "params": blockdict
} }
return update return update
sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
if filtered_streams: if filtered_streams:
await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), request_timeout=sync_timeout, slices=32) await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout) await self.client.indices.refresh(self.index)
if filtered_channels: if filtered_channels:
await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), request_timeout=sync_timeout, slices=32) await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout) await self.client.indices.refresh(self.index)
if blocked_streams: if blocked_streams:
await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), request_timeout=sync_timeout, slices=32) await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout) await self.client.indices.refresh(self.index)
if blocked_channels: if blocked_channels:
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), request_timeout=sync_timeout, slices=32) await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout) await self.client.indices.refresh(self.index)
async def update(self, claims): async def update(self, claims):
if not claims: if not claims:
@ -145,9 +145,10 @@ class SearchIndex:
total_referenced, response, censor = await self.resolve(*kwargs) total_referenced, response, censor = await self.resolve(*kwargs)
else: else:
censor = Censor(Censor.SEARCH) censor = Censor(Censor.SEARCH)
censored_response = asyncio.ensure_future(self.search(**kwargs, censor_type='>0'))
response, offset, total = await self.search(**kwargs, censor_type=0) response, offset, total = await self.search(**kwargs, censor_type=0)
total_referenced.extend(response) total_referenced.extend(response)
censored_response, _, _ = await self.search(**kwargs, censor_type='>0') censored_response, _, _ = await censored_response
censor.apply(censored_response) censor.apply(censored_response)
total_referenced.extend(censored_response) total_referenced.extend(censored_response)
return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor) return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor)