diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index b539ff0..1204213 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -11,7 +11,7 @@ from scribe import PROMETHEUS_NAMESPACE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.error.base import ChainError -from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256 +from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.prefetcher import Prefetcher @@ -125,6 +125,9 @@ class BlockchainProcessorService(BlockchainService): self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} + self.hashX_history_cache = LRUCache(100) + self.hashX_full_cache = LRUCache(100) + async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -1244,14 +1247,30 @@ class BlockchainProcessorService(BlockchainService): self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes) + def _get_cached_hashX_history(self, hashX: bytes) -> str: + if hashX in self.hashX_full_cache: + return self.hashX_full_cache[hashX] + if hashX not in self.hashX_history_cache: + self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None) + else: + tx_nums = self.hashX_history_cache[hashX] + history = '' + for tx_num in tx_nums: + history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:' + self.hashX_full_cache[hashX] = history + return history + def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]): existing = self.db.prefix_db.hashX_status.get(hashX) if existing: self.db.prefix_db.hashX_status.stage_delete((hashX,), existing) - tx_nums = self.db.read_history(hashX, limit=None) + if hashX not in self.hashX_history_cache: + tx_nums = self.db.read_history(hashX, limit=None) + else: + tx_nums = self.hashX_history_cache[hashX] history = '' for tx_num in tx_nums: - history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' + history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:' for tx_hash, height in new_history: history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' if history: @@ -1262,11 +1281,7 @@ class BlockchainProcessorService(BlockchainService): existing = self.db.prefix_db.hashX_mempool_status.get(hashX) if existing: self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing) - tx_nums = self.db.read_history(hashX, limit=None) - history = '' - for tx_num in tx_nums: - history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' - history += self.mempool.mempool_history(hashX) + history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX) if history: status = sha256(history.encode()) self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) @@ -1463,6 +1478,8 @@ class BlockchainProcessorService(BlockchainService): self.pending_support_amount_change.clear() self.touched_hashXs.clear() self.mempool.clear() + self.hashX_history_cache.clear() + self.hashX_full_cache.clear() def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index e98b632..8d7dbd9 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -4,7 +4,7 @@ import attr import typing import logging from collections import defaultdict -from prometheus_client import Histogram +from prometheus_client import Histogram, Gauge import rocksdb.errors from scribe import PROMETHEUS_NAMESPACE from scribe.common import HISTOGRAM_BUCKETS @@ -39,6 +39,10 @@ mempool_process_time_metric = Histogram( "processed_mempool", "Time to process mempool and notify touched addresses", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) +mempool_tx_count_metric = Gauge("mempool_tx_count", "Transactions in mempool", namespace=NAMESPACE) +mempool_touched_address_count_metric = Gauge( + "mempool_touched_address_count", "Count of addresses touched by transactions in mempool", namespace=NAMESPACE +) class HubMemPool: @@ -48,6 +52,7 @@ class HubMemPool: self.logger = logging.getLogger(__name__) self.txs = {} self.raw_mempool = {} + self.tx_touches = {} self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key self.refresh_secs = refresh_secs self.mempool_process_time_metric = mempool_process_time_metric @@ -70,18 +75,18 @@ class HubMemPool: self.logger.error("failed to process mempool, retrying later") return set() raise err - # hashXs = self.hashXs # hashX: [tx_hash, ...] touched_hashXs = set() # Remove txs that aren't in mempool anymore for tx_hash in set(self.txs).difference(self.raw_mempool.keys()): tx = self.txs.pop(tx_hash) - tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs}) + tx_hashXs = self.tx_touches.pop(tx_hash) for hashX in tx_hashXs: - if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]: - self.touched_hashXs[hashX].remove(tx_hash) - if not self.touched_hashXs[hashX]: + if hashX in self.touched_hashXs: + if tx_hash in self.touched_hashXs[hashX]: + self.touched_hashXs[hashX].remove(tx_hash) + if not len(self.touched_hashXs[hashX]): self.touched_hashXs.pop(hashX) touched_hashXs.update(tx_hashXs) @@ -129,11 +134,16 @@ class HubMemPool: tx.fee = max(0, (sum(v for _, v in tx.prevouts) - sum(v for _, v in tx.out_pairs))) self.txs[tx_hash] = tx + self.tx_touches[tx_hash] = tx_touches = set() # print(f"added {tx_hash[::-1].hex()} reader to mempool") for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs): self.touched_hashXs[hashX].add(tx_hash) touched_hashXs.add(hashX) + tx_touches.add(hashX) + + mempool_tx_count_metric.set(len(self.txs)) + mempool_touched_address_count_metric.set(len(self.touched_hashXs)) return touched_hashXs def transaction_summaries(self, hashX):