From 15ac2ade59a78e5a00509b20446ab54e362869bc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Oct 2021 10:18:03 -0400 Subject: [PATCH] resolve lru cache --- lbry/wallet/server/block_processor.py | 5 ++++- lbry/wallet/server/db/elasticsearch/search.py | 19 ------------------- lbry/wallet/server/session.py | 14 +++++++------- 3 files changed, 11 insertions(+), 27 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e59b470db..f2c0bcf9a 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -8,9 +8,10 @@ from prometheus_client import Gauge, Histogram from collections import defaultdict import lbry +from lbry.schema.url import URL from lbry.schema.claim import Claim from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger - +from lbry.utils import LRUCache from lbry.wallet.transaction import OutputScript, Output, Transaction from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.daemon import DaemonError @@ -231,6 +232,7 @@ class BlockProcessor: self.db_op_stack: Optional[RevertableOpStack] = None # self.search_cache = {} + self.resolve_cache = LRUCache(2**16) self.history_cache = {} self.status_server = StatusServer() @@ -1581,6 +1583,7 @@ class BlockProcessor: self.pending_transaction_num_mapping.clear() self.pending_transactions.clear() self.pending_support_amount_change.clear() + self.resolve_cache.clear() async def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 14b47677b..e7a8b58af 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -51,9 +51,7 @@ class SearchIndex: self.index = index_prefix + 'claims' self.logger = class_logger(__name__, self.__class__.__name__) self.claim_cache = LRUCache(2 ** 15) - self.short_id_cache = LRUCache(2 ** 17) self.search_cache = LRUCache(2 ** 17) - self.resolution_cache = LRUCache(2 ** 17) self._elastic_host = elastic_host self._elastic_port = elastic_port self._trending_half_life = half_life @@ -260,9 +258,7 @@ class SearchIndex: def clear_caches(self): self.search_cache.clear() - self.short_id_cache.clear() self.claim_cache.clear() - self.resolution_cache.clear() async def cached_search(self, kwargs): total_referenced = [] @@ -354,21 +350,6 @@ class SearchIndex: for result in expand_result(filter(lambda doc: doc['found'], results["docs"])): self.claim_cache.set(result['claim_id'], result) - async def full_id_from_short_id(self, name, short_id, channel_id=None): - key = '#'.join((channel_id or '', name, short_id)) - if key not in self.short_id_cache: - query = {'name': name, 'claim_id': short_id} - if channel_id: - query['channel_id'] = channel_id - query['order_by'] = ['^channel_join'] - query['signature_valid'] = True - else: - query['order_by'] = '^creation_height' - result, _, _ = await self.search(**query, limit=1) - if len(result) == 1: - result = result[0]['claim_id'] - self.short_id_cache[key] = result - return self.short_id_cache.get(key, None) async def search(self, **kwargs): try: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index c51fc76e4..f49ea5225 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1036,11 +1036,16 @@ class LBRYElectrumX(SessionBase): self.session_mgr.pending_query_metric.dec() self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) - def _claimtrie_resolve(self, *urls): + async def _cached_resolve_url(self, url): + if url not in self.bp.resolve_cache: + self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) + return self.bp.resolve_cache[url] + + async def claimtrie_resolve(self, *urls): rows, extra = [], [] for url in urls: self.session_mgr.urls_to_resolve_count_metric.inc() - stream, channel, repost, reposted_channel = self.db._resolve(url) + stream, channel, repost, reposted_channel = await self._cached_resolve_url(url) if isinstance(channel, ResolveCensoredError): rows.append(channel) extra.append(channel.censor_row) @@ -1067,11 +1072,6 @@ class LBRYElectrumX(SessionBase): # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) return Outputs.to_base64(rows, extra, 0, None, None) - async def claimtrie_resolve(self, *urls): - result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls) - self.session_mgr.resolved_url_count_metric.inc(len(urls)) - return result - async def get_server_height(self): return self.bp.height