forked from LBRYCommunity/lbry-sdk
resolve lru cache
This commit is contained in:
parent
e75047a0ab
commit
ba1d0a12d1
3 changed files with 11 additions and 27 deletions
|
@ -8,9 +8,10 @@ from prometheus_client import Gauge, Histogram
|
|||
from collections import defaultdict
|
||||
|
||||
import lbry
|
||||
from lbry.schema.url import URL
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
||||
|
||||
from lbry.utils import LRUCache
|
||||
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
||||
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
||||
from lbry.wallet.server.daemon import DaemonError
|
||||
|
@ -231,6 +232,7 @@ class BlockProcessor:
|
|||
self.db_op_stack: Optional[RevertableOpStack] = None
|
||||
|
||||
# self.search_cache = {}
|
||||
self.resolve_cache = LRUCache(2**16)
|
||||
self.history_cache = {}
|
||||
self.status_server = StatusServer()
|
||||
|
||||
|
@ -1581,6 +1583,7 @@ class BlockProcessor:
|
|||
self.pending_transaction_num_mapping.clear()
|
||||
self.pending_transactions.clear()
|
||||
self.pending_support_amount_change.clear()
|
||||
self.resolve_cache.clear()
|
||||
|
||||
async def backup_block(self):
|
||||
assert len(self.db.prefix_db._op_stack) == 0
|
||||
|
|
|
@ -51,9 +51,7 @@ class SearchIndex:
|
|||
self.index = index_prefix + 'claims'
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
self.claim_cache = LRUCache(2 ** 15)
|
||||
self.short_id_cache = LRUCache(2 ** 17)
|
||||
self.search_cache = LRUCache(2 ** 17)
|
||||
self.resolution_cache = LRUCache(2 ** 17)
|
||||
self._elastic_host = elastic_host
|
||||
self._elastic_port = elastic_port
|
||||
self._trending_half_life = half_life
|
||||
|
@ -260,9 +258,7 @@ class SearchIndex:
|
|||
|
||||
def clear_caches(self):
|
||||
self.search_cache.clear()
|
||||
self.short_id_cache.clear()
|
||||
self.claim_cache.clear()
|
||||
self.resolution_cache.clear()
|
||||
|
||||
async def cached_search(self, kwargs):
|
||||
total_referenced = []
|
||||
|
@ -354,21 +350,6 @@ class SearchIndex:
|
|||
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
|
||||
self.claim_cache.set(result['claim_id'], result)
|
||||
|
||||
async def full_id_from_short_id(self, name, short_id, channel_id=None):
|
||||
key = '#'.join((channel_id or '', name, short_id))
|
||||
if key not in self.short_id_cache:
|
||||
query = {'name': name, 'claim_id': short_id}
|
||||
if channel_id:
|
||||
query['channel_id'] = channel_id
|
||||
query['order_by'] = ['^channel_join']
|
||||
query['signature_valid'] = True
|
||||
else:
|
||||
query['order_by'] = '^creation_height'
|
||||
result, _, _ = await self.search(**query, limit=1)
|
||||
if len(result) == 1:
|
||||
result = result[0]['claim_id']
|
||||
self.short_id_cache[key] = result
|
||||
return self.short_id_cache.get(key, None)
|
||||
|
||||
async def search(self, **kwargs):
|
||||
try:
|
||||
|
|
|
@ -1036,11 +1036,16 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.pending_query_metric.dec()
|
||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
def _claimtrie_resolve(self, *urls):
|
||||
async def _cached_resolve_url(self, url):
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.bp.resolve_cache[url]
|
||||
|
||||
async def claimtrie_resolve(self, *urls):
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc()
|
||||
stream, channel, repost, reposted_channel = self.db._resolve(url)
|
||||
stream, channel, repost, reposted_channel = await self._cached_resolve_url(url)
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
extra.append(channel.censor_row)
|
||||
|
@ -1067,11 +1072,6 @@ class LBRYElectrumX(SessionBase):
|
|||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
||||
return Outputs.to_base64(rows, extra, 0, None, None)
|
||||
|
||||
async def claimtrie_resolve(self, *urls):
|
||||
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
|
||||
self.session_mgr.resolved_url_count_metric.inc(len(urls))
|
||||
return result
|
||||
|
||||
async def get_server_height(self):
|
||||
return self.bp.height
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue