From 84ff0b8a9f3b7f8b930c94e8f3fce8747b306ea6 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Sun, 31 Jan 2021 02:27:28 -0300
Subject: [PATCH] general timeout

---
 lbry/wallet/server/db/elastic_search.py | 23 ++++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py
index d51cf5949..d38ba731f 100644
--- a/lbry/wallet/server/db/elastic_search.py
+++ b/lbry/wallet/server/db/elastic_search.py
@@ -20,9 +20,10 @@ class SearchIndex:
     def __init__(self, index_prefix: str):
         self.client: Optional[AsyncElasticsearch] = None
         self.index = index_prefix + 'claims'
+        self.sync_timeout = 600  # wont hit that 99% of the time, but can hit on a fresh import
 
     async def start(self):
-        self.client = AsyncElasticsearch()
+        self.client = AsyncElasticsearch(timeout=self.sync_timeout)
         try:
             if await self.client.indices.exists(self.index):
                 return
@@ -93,19 +94,18 @@ class SearchIndex:
                 "params": blockdict
             }
             return update
-        sync_timeout = 600  # wont hit that 99% of the time, but can hit on a fresh import
         if filtered_streams:
-            await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), request_timeout=sync_timeout, slices=32)
-            await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
+            await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), slices=32)
+            await self.client.indices.refresh(self.index)
         if filtered_channels:
-            await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), request_timeout=sync_timeout, slices=32)
-            await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
+            await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), slices=32)
+            await self.client.indices.refresh(self.index)
         if blocked_streams:
-            await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), request_timeout=sync_timeout, slices=32)
-            await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
+            await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), slices=32)
+            await self.client.indices.refresh(self.index)
         if blocked_channels:
-            await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), request_timeout=sync_timeout, slices=32)
-            await self.client.indices.refresh(self.index, request_timeout=sync_timeout)
+            await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32)
+            await self.client.indices.refresh(self.index)
 
     async def update(self, claims):
         if not claims:
@@ -145,9 +145,10 @@ class SearchIndex:
             total_referenced, response, censor = await self.resolve(*kwargs)
         else:
             censor = Censor(Censor.SEARCH)
+            censored_response = asyncio.ensure_future(self.search(**kwargs, censor_type='>0'))
             response, offset, total = await self.search(**kwargs, censor_type=0)
             total_referenced.extend(response)
-            censored_response, _, _ = await self.search(**kwargs, censor_type='>0')
+            censored_response, _, _ = await censored_response
             censor.apply(censored_response)
             total_referenced.extend(censored_response)
         return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor)