apply search timeout

This commit is contained in:
Victor Shyba 2021-03-09 00:19:58 -03:00
parent 891b1e7782
commit 8f32303d07
2 changed files with 15 additions and 9 deletions

View file

@ -21,10 +21,12 @@ from lbry.wallet.server.util import class_logger
class SearchIndex: class SearchIndex:
def __init__(self, index_prefix: str): def __init__(self, index_prefix: str, search_timeout=3.0):
self.search_timeout = search_timeout
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.search_client: Optional[AsyncElasticsearch] = None
self.client: Optional[AsyncElasticsearch] = None self.client: Optional[AsyncElasticsearch] = None
self.index = index_prefix + 'claims' self.index = index_prefix + 'claims'
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.claim_cache = LRUCache(2 ** 15) # invalidated on touched self.claim_cache = LRUCache(2 ** 15) # invalidated on touched
self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever
@ -34,6 +36,7 @@ class SearchIndex:
if self.client: if self.client:
return return
self.client = AsyncElasticsearch(timeout=self.sync_timeout) self.client = AsyncElasticsearch(timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(timeout=self.search_timeout)
while True: while True:
try: try:
await self.client.cluster.health(wait_for_status='yellow') await self.client.cluster.health(wait_for_status='yellow')
@ -79,9 +82,9 @@ class SearchIndex:
return res.get('acknowledged', False) return res.get('acknowledged', False)
def stop(self): def stop(self):
client = self.client clients = [self.client, self.search_client]
self.client = None self.client, self.search_client = None, None
return asyncio.ensure_future(client.close()) return asyncio.ensure_future(asyncio.gather(*(client.close() for client in clients)))
def delete_index(self): def delete_index(self):
return self.client.indices.delete(self.index, ignore_unavailable=True) return self.client.indices.delete(self.index, ignore_unavailable=True)
@ -183,8 +186,9 @@ class SearchIndex:
async def get_many(self, *claim_ids): async def get_many(self, *claim_ids):
missing = [claim_id for claim_id in claim_ids if claim_id not in self.claim_cache] missing = [claim_id for claim_id in claim_ids if claim_id not in self.claim_cache]
if missing: if missing:
results = await self.client.mget(index=self.index, body={"ids": missing}, results = await self.search_client.mget(
_source_excludes=['description', 'title']) index=self.index, body={"ids": missing}, _source_excludes=['description', 'title']
)
results = expand_result(filter(lambda doc: doc['found'], results["docs"])) results = expand_result(filter(lambda doc: doc['found'], results["docs"]))
for result in results: for result in results:
self.claim_cache.set(result['claim_id'], result) self.claim_cache.set(result['claim_id'], result)
@ -220,7 +224,9 @@ class SearchIndex:
if cache_item.result: if cache_item.result:
result = json.loads(zlib.decompress(cache_item.result)) result = json.loads(zlib.decompress(cache_item.result))
else: else:
result = await self.client.search(expand_query(**kwargs), index=self.index, track_total_hits=200) result = await self.search_client.search(
expand_query(**kwargs), index=self.index, track_total_hits=200
)
cache_item.result = zlib.compress(json.dumps(result).encode(), 1) cache_item.result = zlib.compress(json.dumps(result).encode(), 1)
except NotFoundError: except NotFoundError:
# index has no docs, fixme: log something # index has no docs, fixme: log something

View file

@ -973,7 +973,7 @@ class LBRYLevelDB(LevelDB):
) )
# Search index # Search index
self.search_index = SearchIndex(self.env.es_index_prefix) self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout)
def close(self): def close(self):
super().close() super().close()