diff --git a/hub/scribe/daemon.py b/hub/scribe/daemon.py index 91835e3..568e335 100644 --- a/hub/scribe/daemon.py +++ b/hub/scribe/daemon.py @@ -62,7 +62,7 @@ class LBCDaemon: self.connector = aiohttp.TCPConnector(ssl_context=ssl_context) else: self.connector = aiohttp.TCPConnector(ssl=False) - self._block_hash_cache = LRUCacheWithMetrics(1024) + self._block_hash_cache = LRUCacheWithMetrics(1024, metric_name='block_hash', namespace=NAMESPACE) self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE) async def close(self): diff --git a/hub/scribe/env.py b/hub/scribe/env.py index 746679c..281f18b 100644 --- a/hub/scribe/env.py +++ b/hub/scribe/env.py @@ -7,7 +7,7 @@ class BlockchainEnv(Env): blocking_channel_ids=None, filtering_channel_ids=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, index_address_status=None, rebuild_address_status_from_height=None, - daemon_ca_path=None): + daemon_ca_path=None, history_tx_cache_size=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files @@ -17,6 +17,8 @@ class BlockchainEnv(Env): self.rebuild_address_status_from_height = rebuild_address_status_from_height \ if isinstance(rebuild_address_status_from_height, int) else -1 self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None + self.history_tx_cache_size = history_tx_cache_size if history_tx_cache_size is not None else \ + self.integer('HISTORY_TX_CACHE_SIZE', 262144) @classmethod def contribute_to_arg_parser(cls, parser): @@ -39,6 +41,10 @@ class BlockchainEnv(Env): parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1, help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a " "block height to start reindexing from. Defaults to -1 (off).") + parser.add_argument('--history_tx_cache_size', type=int, + default=cls.integer('HISTORY_TX_CACHE_SIZE', 262144), + help="Size of the lfu cache of txids in transaction histories for addresses. " + "Can be set in the env with 'TX_CACHE_SIZE'") @classmethod def from_arg_parser(cls, args): @@ -49,5 +55,5 @@ class BlockchainEnv(Env): cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, hashX_history_cache_size=args.address_history_cache_size, rebuild_address_status_from_height=args.rebuild_address_status_from_height, - daemon_ca_path=args.daemon_ca_path + daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size ) diff --git a/hub/scribe/service.py b/hub/scribe/service.py index ec757f9..5585757 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -11,7 +11,7 @@ from hub import PROMETHEUS_NAMESPACE from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.error.base import ChainError -from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LRUCache +from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics from hub.scribe.db import PrimaryDB from hub.scribe.daemon import LBCDaemon from hub.scribe.transaction import Tx, TxOutput, TxInput, Block @@ -122,9 +122,9 @@ class BlockchainProcessorService(BlockchainService): self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} - self.hashX_history_cache = LFUCache(max(100, env.hashX_history_cache_size)) - self.hashX_full_cache = LFUCache(max(100, env.hashX_history_cache_size)) - self.history_tx_info_cache = LFUCache(2 ** 17) + self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE) + self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE) + self.history_tx_info_cache = LFUCacheWithMetrics(max(100, env.history_tx_cache_size), 'hashX_tx', NAMESPACE) def open_db(self): env = self.env @@ -1274,8 +1274,9 @@ class BlockchainProcessorService(BlockchainService): append_needed_tx_info = needed_tx_infos.append tx_infos = {} for tx_num in tx_nums: - if tx_num in self.history_tx_info_cache: - tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + cached_tx_info = self.history_tx_info_cache.get(tx_num) + if cached_tx_info is not None: + tx_infos[tx_num] = cached_tx_info else: append_needed_tx_info(tx_num) if needed_tx_infos: @@ -1502,8 +1503,9 @@ class BlockchainProcessorService(BlockchainService): append_needed_tx_info = needed_tx_infos.append tx_infos = {} for tx_num in tx_nums: - if tx_num in self.history_tx_info_cache: - tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + cached_tx_info = self.history_tx_info_cache.get(tx_num) + if cached_tx_info is not None: + tx_infos[tx_num] = cached_tx_info else: append_needed_tx_info(tx_num) if needed_tx_infos: