This commit is contained in:
Jack Robison 2022-07-28 19:32:15 -04:00
parent 0918299163
commit 9df8f9c651
3 changed files with 19 additions and 11 deletions

View file

@ -62,7 +62,7 @@ class LBCDaemon:
self.connector = aiohttp.TCPConnector(ssl_context=ssl_context) self.connector = aiohttp.TCPConnector(ssl_context=ssl_context)
else: else:
self.connector = aiohttp.TCPConnector(ssl=False) self.connector = aiohttp.TCPConnector(ssl=False)
self._block_hash_cache = LRUCacheWithMetrics(1024) self._block_hash_cache = LRUCacheWithMetrics(1024, metric_name='block_hash', namespace=NAMESPACE)
self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE) self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE)
async def close(self): async def close(self):

View file

@ -7,7 +7,7 @@ class BlockchainEnv(Env):
blocking_channel_ids=None, filtering_channel_ids=None, blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None, index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None): daemon_ca_path=None, history_tx_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files self.db_max_open_files = db_max_open_files
@ -17,6 +17,8 @@ class BlockchainEnv(Env):
self.rebuild_address_status_from_height = rebuild_address_status_from_height \ self.rebuild_address_status_from_height = rebuild_address_status_from_height \
if isinstance(rebuild_address_status_from_height, int) else -1 if isinstance(rebuild_address_status_from_height, int) else -1
self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None
self.history_tx_cache_size = history_tx_cache_size if history_tx_cache_size is not None else \
self.integer('HISTORY_TX_CACHE_SIZE', 262144)
@classmethod @classmethod
def contribute_to_arg_parser(cls, parser): def contribute_to_arg_parser(cls, parser):
@ -39,6 +41,10 @@ class BlockchainEnv(Env):
parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1, parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1,
help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a " help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a "
"block height to start reindexing from. Defaults to -1 (off).") "block height to start reindexing from. Defaults to -1 (off).")
parser.add_argument('--history_tx_cache_size', type=int,
default=cls.integer('HISTORY_TX_CACHE_SIZE', 262144),
help="Size of the lfu cache of txids in transaction histories for addresses. "
"Can be set in the env with 'TX_CACHE_SIZE'")
@classmethod @classmethod
def from_arg_parser(cls, args): def from_arg_parser(cls, args):
@ -49,5 +55,5 @@ class BlockchainEnv(Env):
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size, hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height, rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size
) )

View file

@ -11,7 +11,7 @@ from hub import PROMETHEUS_NAMESPACE
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LRUCache from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
from hub.scribe.db import PrimaryDB from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
@ -122,9 +122,9 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {} self.pending_transactions: Dict[int, bytes] = {}
self.hashX_history_cache = LFUCache(max(100, env.hashX_history_cache_size)) self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE)
self.hashX_full_cache = LFUCache(max(100, env.hashX_history_cache_size)) self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE)
self.history_tx_info_cache = LFUCache(2 ** 17) self.history_tx_info_cache = LFUCacheWithMetrics(max(100, env.history_tx_cache_size), 'hashX_tx', NAMESPACE)
def open_db(self): def open_db(self):
env = self.env env = self.env
@ -1274,8 +1274,9 @@ class BlockchainProcessorService(BlockchainService):
append_needed_tx_info = needed_tx_infos.append append_needed_tx_info = needed_tx_infos.append
tx_infos = {} tx_infos = {}
for tx_num in tx_nums: for tx_num in tx_nums:
if tx_num in self.history_tx_info_cache: cached_tx_info = self.history_tx_info_cache.get(tx_num)
tx_infos[tx_num] = self.history_tx_info_cache[tx_num] if cached_tx_info is not None:
tx_infos[tx_num] = cached_tx_info
else: else:
append_needed_tx_info(tx_num) append_needed_tx_info(tx_num)
if needed_tx_infos: if needed_tx_infos:
@ -1502,8 +1503,9 @@ class BlockchainProcessorService(BlockchainService):
append_needed_tx_info = needed_tx_infos.append append_needed_tx_info = needed_tx_infos.append
tx_infos = {} tx_infos = {}
for tx_num in tx_nums: for tx_num in tx_nums:
if tx_num in self.history_tx_info_cache: cached_tx_info = self.history_tx_info_cache.get(tx_num)
tx_infos[tx_num] = self.history_tx_info_cache[tx_num] if cached_tx_info is not None:
tx_infos[tx_num] = cached_tx_info
else: else:
append_needed_tx_info(tx_num) append_needed_tx_info(tx_num)
if needed_tx_infos: if needed_tx_infos: