backport fixes from testing server

This commit is contained in:
Victor Shyba 2021-01-29 23:38:15 -03:00
parent 5bc1a66572
commit 7674a0a91e

View file

@ -93,18 +93,19 @@ class SearchIndex:
"params": blockdict
}
return update
sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
if filtered_streams:
await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), request_timeout=120, slices=32)
await self.client.indices.refresh(self.index, request_timeout=120)
await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), request_timeout=sync_timeout, slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
if filtered_channels:
await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), request_timeout=120, slices=32)
await self.client.indices.refresh(self.index, request_timeout=120)
await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), request_timeout=sync_timeout, slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
if blocked_streams:
await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), request_timeout=120, slices=32)
await self.client.indices.refresh(self.index, request_timeout=120)
await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), request_timeout=sync_timeout, slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
if blocked_channels:
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), request_timeout=120, slices=32)
await self.client.indices.refresh(self.index, request_timeout=120)
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), request_timeout=sync_timeout, slices=32)
await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
async def update(self, claims):
if not claims:
@ -138,13 +139,17 @@ class SearchIndex:
async def session_query(self, query_name, function, kwargs):
offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0
total_referenced = []
if query_name == 'resolve':
response, censored, censor = await self.resolve(*kwargs)
total_referenced, response, censor = await self.resolve(*kwargs)
else:
censor = Censor(Censor.SEARCH)
response, offset, total = await self.search(**kwargs)
censored = censor.apply(response)
return Outputs.to_base64(censored, await self._get_referenced_rows(response), offset, total, censor)
response, offset, total = await self.search(**kwargs, censor_type=0)
total_referenced.extend(response)
censored_response, _, _ = await self.search(**kwargs, censor_type='>0')
censor.apply(censored_response)
total_referenced.extend(censored_response)
return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor)
async def resolve(self, *urls):
censor = Censor(Censor.RESOLVE)
@ -197,7 +202,7 @@ class SearchIndex:
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_hash'] = channel['claim_hash']
query['channel_id'] = channel['claim_id']
query['signature_valid'] = True
elif set(query) == {'name'}:
query['is_controlling'] = True
@ -278,7 +283,7 @@ def expand_query(**kwargs):
query = {'must': [], 'must_not': []}
collapse = None
for key, value in kwargs.items():
if not value:
if value is None or isinstance(value, list) and len(value) == 0:
continue
key = key.replace('claim.', '')
many = key.endswith('__in') or isinstance(value, list)