resolve lru cache
This commit is contained in:
parent
09e0d5c55e
commit
15ac2ade59
3 changed files with 11 additions and 27 deletions
|
@ -8,9 +8,10 @@ from prometheus_client import Gauge, Histogram
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
import lbry
|
import lbry
|
||||||
|
from lbry.schema.url import URL
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
||||||
|
from lbry.utils import LRUCache
|
||||||
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
||||||
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
||||||
from lbry.wallet.server.daemon import DaemonError
|
from lbry.wallet.server.daemon import DaemonError
|
||||||
|
@ -231,6 +232,7 @@ class BlockProcessor:
|
||||||
self.db_op_stack: Optional[RevertableOpStack] = None
|
self.db_op_stack: Optional[RevertableOpStack] = None
|
||||||
|
|
||||||
# self.search_cache = {}
|
# self.search_cache = {}
|
||||||
|
self.resolve_cache = LRUCache(2**16)
|
||||||
self.history_cache = {}
|
self.history_cache = {}
|
||||||
self.status_server = StatusServer()
|
self.status_server = StatusServer()
|
||||||
|
|
||||||
|
@ -1581,6 +1583,7 @@ class BlockProcessor:
|
||||||
self.pending_transaction_num_mapping.clear()
|
self.pending_transaction_num_mapping.clear()
|
||||||
self.pending_transactions.clear()
|
self.pending_transactions.clear()
|
||||||
self.pending_support_amount_change.clear()
|
self.pending_support_amount_change.clear()
|
||||||
|
self.resolve_cache.clear()
|
||||||
|
|
||||||
async def backup_block(self):
|
async def backup_block(self):
|
||||||
assert len(self.db.prefix_db._op_stack) == 0
|
assert len(self.db.prefix_db._op_stack) == 0
|
||||||
|
|
|
@ -51,9 +51,7 @@ class SearchIndex:
|
||||||
self.index = index_prefix + 'claims'
|
self.index = index_prefix + 'claims'
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
self.claim_cache = LRUCache(2 ** 15)
|
self.claim_cache = LRUCache(2 ** 15)
|
||||||
self.short_id_cache = LRUCache(2 ** 17)
|
|
||||||
self.search_cache = LRUCache(2 ** 17)
|
self.search_cache = LRUCache(2 ** 17)
|
||||||
self.resolution_cache = LRUCache(2 ** 17)
|
|
||||||
self._elastic_host = elastic_host
|
self._elastic_host = elastic_host
|
||||||
self._elastic_port = elastic_port
|
self._elastic_port = elastic_port
|
||||||
self._trending_half_life = half_life
|
self._trending_half_life = half_life
|
||||||
|
@ -260,9 +258,7 @@ class SearchIndex:
|
||||||
|
|
||||||
def clear_caches(self):
|
def clear_caches(self):
|
||||||
self.search_cache.clear()
|
self.search_cache.clear()
|
||||||
self.short_id_cache.clear()
|
|
||||||
self.claim_cache.clear()
|
self.claim_cache.clear()
|
||||||
self.resolution_cache.clear()
|
|
||||||
|
|
||||||
async def cached_search(self, kwargs):
|
async def cached_search(self, kwargs):
|
||||||
total_referenced = []
|
total_referenced = []
|
||||||
|
@ -354,21 +350,6 @@ class SearchIndex:
|
||||||
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
|
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
|
||||||
self.claim_cache.set(result['claim_id'], result)
|
self.claim_cache.set(result['claim_id'], result)
|
||||||
|
|
||||||
async def full_id_from_short_id(self, name, short_id, channel_id=None):
|
|
||||||
key = '#'.join((channel_id or '', name, short_id))
|
|
||||||
if key not in self.short_id_cache:
|
|
||||||
query = {'name': name, 'claim_id': short_id}
|
|
||||||
if channel_id:
|
|
||||||
query['channel_id'] = channel_id
|
|
||||||
query['order_by'] = ['^channel_join']
|
|
||||||
query['signature_valid'] = True
|
|
||||||
else:
|
|
||||||
query['order_by'] = '^creation_height'
|
|
||||||
result, _, _ = await self.search(**query, limit=1)
|
|
||||||
if len(result) == 1:
|
|
||||||
result = result[0]['claim_id']
|
|
||||||
self.short_id_cache[key] = result
|
|
||||||
return self.short_id_cache.get(key, None)
|
|
||||||
|
|
||||||
async def search(self, **kwargs):
|
async def search(self, **kwargs):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -1036,11 +1036,16 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.session_mgr.pending_query_metric.dec()
|
self.session_mgr.pending_query_metric.dec()
|
||||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||||
|
|
||||||
def _claimtrie_resolve(self, *urls):
|
async def _cached_resolve_url(self, url):
|
||||||
|
if url not in self.bp.resolve_cache:
|
||||||
|
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||||
|
return self.bp.resolve_cache[url]
|
||||||
|
|
||||||
|
async def claimtrie_resolve(self, *urls):
|
||||||
rows, extra = [], []
|
rows, extra = [], []
|
||||||
for url in urls:
|
for url in urls:
|
||||||
self.session_mgr.urls_to_resolve_count_metric.inc()
|
self.session_mgr.urls_to_resolve_count_metric.inc()
|
||||||
stream, channel, repost, reposted_channel = self.db._resolve(url)
|
stream, channel, repost, reposted_channel = await self._cached_resolve_url(url)
|
||||||
if isinstance(channel, ResolveCensoredError):
|
if isinstance(channel, ResolveCensoredError):
|
||||||
rows.append(channel)
|
rows.append(channel)
|
||||||
extra.append(channel.censor_row)
|
extra.append(channel.censor_row)
|
||||||
|
@ -1067,11 +1072,6 @@ class LBRYElectrumX(SessionBase):
|
||||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
||||||
return Outputs.to_base64(rows, extra, 0, None, None)
|
return Outputs.to_base64(rows, extra, 0, None, None)
|
||||||
|
|
||||||
async def claimtrie_resolve(self, *urls):
|
|
||||||
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
|
|
||||||
self.session_mgr.resolved_url_count_metric.inc(len(urls))
|
|
||||||
return result
|
|
||||||
|
|
||||||
async def get_server_height(self):
|
async def get_server_height(self):
|
||||||
return self.bp.height
|
return self.bp.height
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue