make better resolve cache

This commit is contained in:
Victor Shyba 2021-03-05 04:32:48 -03:00
parent 6b193ab350
commit 2641a9abe5
2 changed files with 66 additions and 36 deletions

View file

@ -55,6 +55,14 @@ class PathSegment(NamedTuple):
def normalized(self):
return normalize_name(self.name)
@property
def is_shortid(self):
return self.claim_id is not None and len(self.claim_id) < 40
@property
def is_fullid(self):
return self.claim_id is not None and len(self.claim_id) == 40
def to_dict(self):
q = {'name': self.name}
if self.claim_id is not None:

View file

@ -9,7 +9,7 @@ from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from elasticsearch.helpers import async_streaming_bulk
from lbry.crypto.base58 import Base58
from lbry.error import ResolveCensoredError, claim_id
from lbry.error import ResolveCensoredError, claim_id as parse_claim_id
from lbry.schema.result import Outputs, Censor
from lbry.schema.tags import clean_tags
from lbry.schema.url import URL, normalize_name
@ -24,8 +24,8 @@ class SearchIndex:
self.index = index_prefix + 'claims'
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.logger = class_logger(__name__, self.__class__.__name__)
self.search_cache = LRUCache(2 ** 16)
self.channel_cache = LRUCache(2 ** 16)
self.claim_cache = LRUCache(2 ** 15) # invalidated on touched
self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever
async def start(self):
if self.client:
@ -97,11 +97,18 @@ class SearchIndex:
async def claim_consumer(self, claim_producer):
await self.client.indices.refresh(self.index)
touched = set()
async for ok, item in async_streaming_bulk(self.client, self._consume_claim_producer(claim_producer)):
if not ok:
self.logger.warning("indexing failed for an item: %s", item)
else:
item = item.popitem()[1]
touched.add(item['_id'])
await self.client.indices.refresh(self.index)
await self.client.indices.flush(self.index)
for claim_id in touched:
if claim_id in self.claim_cache:
self.claim_cache.pop(claim_id)
self.logger.info("Indexing done.")
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
@ -112,6 +119,9 @@ class SearchIndex:
update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
else:
update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
for claim_id in blockdict:
if claim_id in self.claim_cache:
self.claim_cache.pop(claim_id)
key = 'channel_id' if channels else 'claim_id'
update['script'] = {
"source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_hash=params[ctx._source.{key}]",
@ -135,8 +145,6 @@ class SearchIndex:
await self.client.indices.refresh(self.index)
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32)
await self.client.indices.refresh(self.index)
self.search_cache.clear()
self.channel_cache.clear()
async def delete_above_height(self, height):
await self.client.delete_by_query(self.index, expand_query(height='>'+str(height)))
@ -168,15 +176,32 @@ class SearchIndex:
return results, censored, censor
async def get_many(self, *claim_ids):
cached = {claim_id: self.search_cache.get(claim_id) for claim_id in claim_ids if claim_id in self.search_cache}
missing = [claim_id for claim_id in claim_ids if claim_id not in cached]
missing = [claim_id for claim_id in claim_ids if claim_id not in self.claim_cache]
if missing:
results = await self.client.mget(index=self.index, body={"ids": missing},
_source_excludes=['description', 'title'])
results = expand_result(filter(lambda doc: doc['found'], results["docs"]))
for result in results:
self.search_cache.set(result['claim_id'], result)
return list(filter(None, map(self.search_cache.get, claim_ids)))
self.claim_cache.set(result['claim_id'], result)
return list(filter(None, map(self.claim_cache.get, claim_ids)))
async def full_id_from_short_id(self, name, short_id, channel_id=None):
key = (channel_id or '') + name + short_id
if key not in self.short_id_cache:
query = {'name': name, 'claim_id': short_id}
if channel_id:
query['channel_id'] = channel_id
query['order_by'] = ['^channel_join']
query['channel_id'] = channel_id
query['signature_valid'] = True
else:
query['order_by'] = '^creation_height'
result, _, _ = await self.search(**query, limit=1)
if len(result) == 1:
result = result[0]['claim_id']
self.short_id_cache[key] = result
return self.short_id_cache.get(key, None)
async def search(self, **kwargs):
if 'channel' in kwargs:
@ -217,23 +242,24 @@ class SearchIndex:
async def resolve_channel_id(self, url: URL):
if not url.has_channel:
return
key = 'cid:' + str(url.channel)
if key in self.channel_cache:
return self.channel_cache[key]
if url.channel.is_fullid:
return url.channel.claim_id
if url.channel.is_shortid:
channel_id = await self.full_id_from_short_id(url.channel.name, url.channel.claim_id)
if not channel_id:
return LookupError(f'Could not find channel in "{url}".')
return channel_id
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
if len(query.get('claim_id', '')) != 40:
matches, _, _ = await self.search(**query, limit=1)
if matches:
channel_id = matches[0]['claim_id']
else:
return LookupError(f'Could not find channel in "{url}".')
matches, _, _ = await self.search(**query, limit=1)
if matches:
channel_id = matches[0]['claim_id']
else:
channel_id = query['claim_id']
self.channel_cache.set(key, channel_id)
return LookupError(f'Could not find channel in "{url}".')
return channel_id
async def resolve_stream(self, url: URL, channel_id: str = None):
@ -242,14 +268,14 @@ class SearchIndex:
if url.has_channel and channel_id is None:
return None
query = url.stream.to_dict()
stream = None
if 'claim_id' in query and len(query['claim_id']) == 40:
stream = (await self.get_many(query['claim_id']))
stream = stream[0] if len(stream) else None
else:
key = (channel_id or '') + str(url.stream)
if key in self.search_cache:
return self.search_cache[key]
if url.stream.claim_id is not None:
if url.stream.is_fullid:
claim_id = url.stream.claim_id
else:
claim_id = await self.full_id_from_short_id(query['name'], query['claim_id'], channel_id)
stream = await self.get_many(claim_id)
return stream[0] if len(stream) else None
if channel_id is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
@ -260,19 +286,15 @@ class SearchIndex:
query['signature_valid'] = True
elif set(query) == {'name'}:
query['is_controlling'] = True
if not stream:
matches, _, _ = await self.search(**query, limit=1)
if matches:
stream = matches[0]
key = (channel_id or '') + str(url.stream)
self.search_cache.set(key, stream)
return stream
matches, _, _ = await self.search(**query, limit=1)
if matches:
return matches[0]
async def _get_referenced_rows(self, txo_rows: List[dict]):
txo_rows = [row for row in txo_rows if isinstance(row, dict)]
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows)))
channel_hashes = set(filter(None, (row['channel_id'] for row in txo_rows)))
channel_hashes |= set(map(claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows))))
channel_hashes |= set(map(parse_claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows))))
reposted_txos = []
if repost_hashes: