cache the encoded output instead

This commit is contained in:
Victor Shyba 2021-03-10 12:45:47 -03:00
parent 20a5aecfca
commit 60a59407d8
2 changed files with 21 additions and 22 deletions

View file

@ -1,14 +1,12 @@
import asyncio import asyncio
import json import json
import struct import struct
import zlib
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from decimal import Decimal from decimal import Decimal
from operator import itemgetter from operator import itemgetter
from typing import Optional, List, Iterable from typing import Optional, List, Iterable
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from elasticsearch.exceptions import ConnectionTimeout
from elasticsearch.helpers import async_streaming_bulk from elasticsearch.helpers import async_streaming_bulk
from lbry.crypto.base58 import Base58 from lbry.crypto.base58 import Base58
@ -162,17 +160,25 @@ class SearchIndex:
async def session_query(self, query_name, kwargs): async def session_query(self, query_name, kwargs):
offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0 offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0
total_referenced = [] total_referenced = []
cache_item = None
if query_name == 'resolve': if query_name == 'resolve':
total_referenced, response, censor = await self.resolve(*kwargs) total_referenced, response, censor = await self.resolve(*kwargs)
else: else:
censor = Censor(Censor.SEARCH) cache_item = ResultCacheItem.from_cache(json.dumps(kwargs, sort_keys=True), self.search_cache)
response, offset, total = await self.search(**kwargs) async with cache_item.lock:
censor.apply(response) if cache_item.result:
total_referenced.extend(response) return cache_item.result
if censor.censored: censor = Censor(Censor.SEARCH)
response, _, _ = await self.search(**kwargs, censor_type=0) response, offset, total = await self.search(**kwargs)
censor.apply(response)
total_referenced.extend(response) total_referenced.extend(response)
return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor) if censor.censored:
response, _, _ = await self.search(**kwargs, censor_type=0)
total_referenced.extend(response)
result = Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor)
if cache_item:
cache_item.result = result
return result
async def resolve(self, *urls): async def resolve(self, *urls):
censor = Censor(Censor.RESOLVE) censor = Censor(Censor.RESOLVE)
@ -219,18 +225,9 @@ class SearchIndex:
return [], 0, 0 return [], 0, 0
kwargs['channel_id'] = result['claim_id'] kwargs['channel_id'] = result['claim_id']
try: try:
expanded = expand_query(**kwargs) result = await self.search_client.search(
cache_item = ResultCacheItem.from_cache(json.dumps(expanded, sort_keys=True), self.search_cache) expand_query(**kwargs), index=self.index, track_total_hits=200
async with cache_item.lock: )
if cache_item.result:
result = json.loads(zlib.decompress(cache_item.result))
else:
result = await self.search_client.search(
expand_query(**kwargs), index=self.index, track_total_hits=200
)
cache_item.result = zlib.compress(json.dumps(result).encode(), 1)
except ConnectionTimeout:
raise TimeoutError()
except NotFoundError: except NotFoundError:
# index has no docs, fixme: log something # index has no docs, fixme: log something
return [], 0, 0 return [], 0, 0

View file

@ -17,6 +17,8 @@ from functools import partial
from binascii import hexlify from binascii import hexlify
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from elasticsearch import ConnectionTimeout
from prometheus_client import Counter, Info, Histogram, Gauge from prometheus_client import Counter, Info, Histogram, Gauge
import lbry import lbry
@ -1009,7 +1011,7 @@ class LBRYElectrumX(SessionBase):
try: try:
self.session_mgr.pending_query_metric.inc() self.session_mgr.pending_query_metric.inc()
return await self.db.search_index.session_query(query_name, kwargs) return await self.db.search_index.session_query(query_name, kwargs)
except (TimeoutError, asyncio.TimeoutError): except ConnectionTimeout:
self.session_mgr.interrupt_count_metric.inc() self.session_mgr.interrupt_count_metric.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
finally: finally: