diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index d51cf5949..d38ba731f 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -20,9 +20,10 @@ class SearchIndex: def __init__(self, index_prefix: str): self.client: Optional[AsyncElasticsearch] = None self.index = index_prefix + 'claims' + self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import async def start(self): - self.client = AsyncElasticsearch() + self.client = AsyncElasticsearch(timeout=self.sync_timeout) try: if await self.client.indices.exists(self.index): return @@ -93,19 +94,18 @@ class SearchIndex: "params": blockdict } return update - sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import if filtered_streams: - await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), request_timeout=sync_timeout, slices=32) - await self.client.indices.refresh(self.index, request_timeout=sync_timeout) + await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), slices=32) + await self.client.indices.refresh(self.index) if filtered_channels: - await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), request_timeout=sync_timeout, slices=32) - await self.client.indices.refresh(self.index, request_timeout=sync_timeout) + await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), slices=32) + await self.client.indices.refresh(self.index) if blocked_streams: - await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), request_timeout=sync_timeout, slices=32) - await self.client.indices.refresh(self.index, request_timeout=sync_timeout) + await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), slices=32) + await self.client.indices.refresh(self.index) if blocked_channels: - await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), request_timeout=sync_timeout, slices=32) - await self.client.indices.refresh(self.index, request_timeout=sync_timeout) + await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32) + await self.client.indices.refresh(self.index) async def update(self, claims): if not claims: @@ -145,9 +145,10 @@ class SearchIndex: total_referenced, response, censor = await self.resolve(*kwargs) else: censor = Censor(Censor.SEARCH) + censored_response = asyncio.ensure_future(self.search(**kwargs, censor_type='>0')) response, offset, total = await self.search(**kwargs, censor_type=0) total_referenced.extend(response) - censored_response, _, _ = await self.search(**kwargs, censor_type='>0') + censored_response, _, _ = await censored_response censor.apply(censored_response) total_referenced.extend(censored_response) return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor)