resolve lru cache

This commit is contained in:
Jack Robison 2021-10-13 10:18:03 -04:00
parent f5e0ef5223
commit eabcc30367
3 changed files with 11 additions and 27 deletions

View file

@ -8,9 +8,10 @@ from prometheus_client import Gauge, Histogram
from collections import defaultdict from collections import defaultdict
import lbry import lbry
from lbry.schema.url import URL
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
from lbry.utils import LRUCache
from lbry.wallet.transaction import OutputScript, Output, Transaction from lbry.wallet.transaction import OutputScript, Output, Transaction
from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.tx import Tx, TxOutput, TxInput
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
@ -231,6 +232,7 @@ class BlockProcessor:
self.db_op_stack: Optional[RevertableOpStack] = None self.db_op_stack: Optional[RevertableOpStack] = None
# self.search_cache = {} # self.search_cache = {}
self.resolve_cache = LRUCache(2**16)
self.history_cache = {} self.history_cache = {}
self.status_server = StatusServer() self.status_server = StatusServer()
@ -1581,6 +1583,7 @@ class BlockProcessor:
self.pending_transaction_num_mapping.clear() self.pending_transaction_num_mapping.clear()
self.pending_transactions.clear() self.pending_transactions.clear()
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.resolve_cache.clear()
async def backup_block(self): async def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0

View file

@ -51,9 +51,7 @@ class SearchIndex:
self.index = index_prefix + 'claims' self.index = index_prefix + 'claims'
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.claim_cache = LRUCache(2 ** 15) self.claim_cache = LRUCache(2 ** 15)
self.short_id_cache = LRUCache(2 ** 17)
self.search_cache = LRUCache(2 ** 17) self.search_cache = LRUCache(2 ** 17)
self.resolution_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host self._elastic_host = elastic_host
self._elastic_port = elastic_port self._elastic_port = elastic_port
self._trending_half_life = half_life self._trending_half_life = half_life
@ -260,9 +258,7 @@ class SearchIndex:
def clear_caches(self): def clear_caches(self):
self.search_cache.clear() self.search_cache.clear()
self.short_id_cache.clear()
self.claim_cache.clear() self.claim_cache.clear()
self.resolution_cache.clear()
async def cached_search(self, kwargs): async def cached_search(self, kwargs):
total_referenced = [] total_referenced = []
@ -354,21 +350,6 @@ class SearchIndex:
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])): for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
self.claim_cache.set(result['claim_id'], result) self.claim_cache.set(result['claim_id'], result)
async def full_id_from_short_id(self, name, short_id, channel_id=None):
key = '#'.join((channel_id or '', name, short_id))
if key not in self.short_id_cache:
query = {'name': name, 'claim_id': short_id}
if channel_id:
query['channel_id'] = channel_id
query['order_by'] = ['^channel_join']
query['signature_valid'] = True
else:
query['order_by'] = '^creation_height'
result, _, _ = await self.search(**query, limit=1)
if len(result) == 1:
result = result[0]['claim_id']
self.short_id_cache[key] = result
return self.short_id_cache.get(key, None)
async def search(self, **kwargs): async def search(self, **kwargs):
try: try:

View file

@ -1036,11 +1036,16 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.pending_query_metric.dec() self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
def _claimtrie_resolve(self, *urls): async def _cached_resolve_url(self, url):
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url]
async def claimtrie_resolve(self, *urls):
rows, extra = [], [] rows, extra = [], []
for url in urls: for url in urls:
self.session_mgr.urls_to_resolve_count_metric.inc() self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel, repost, reposted_channel = self.db._resolve(url) stream, channel, repost, reposted_channel = await self._cached_resolve_url(url)
if isinstance(channel, ResolveCensoredError): if isinstance(channel, ResolveCensoredError):
rows.append(channel) rows.append(channel)
extra.append(channel.censor_row) extra.append(channel.censor_row)
@ -1067,11 +1072,6 @@ class LBRYElectrumX(SessionBase):
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None) return Outputs.to_base64(rows, extra, 0, None, None)
async def claimtrie_resolve(self, *urls):
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
self.session_mgr.resolved_url_count_metric.inc(len(urls))
return result
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.bp.height