cache cache histories from the db for mempool updates

This commit is contained in:
Jack Robison 2022-04-12 15:27:28 -04:00
parent cb5f396b33
commit ca1444eb5d
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -11,7 +11,7 @@ from scribe import PROMETHEUS_NAMESPACE
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.error.base import ChainError from scribe.error.base import ChainError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256 from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher from scribe.blockchain.prefetcher import Prefetcher
@ -125,6 +125,9 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {} self.pending_transactions: Dict[int, bytes] = {}
self.hashX_history_cache = LRUCache(100)
self.hashX_full_cache = LRUCache(100)
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -1244,14 +1247,30 @@ class BlockchainProcessorService(BlockchainService):
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes)
def _get_cached_hashX_history(self, hashX: bytes) -> str:
if hashX in self.hashX_full_cache:
return self.hashX_full_cache[hashX]
if hashX not in self.hashX_history_cache:
self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None)
else:
tx_nums = self.hashX_history_cache[hashX]
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
self.hashX_full_cache[hashX] = history
return history
def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]): def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
existing = self.db.prefix_db.hashX_status.get(hashX) existing = self.db.prefix_db.hashX_status.get(hashX)
if existing: if existing:
self.db.prefix_db.hashX_status.stage_delete((hashX,), existing) self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None) if hashX not in self.hashX_history_cache:
tx_nums = self.db.read_history(hashX, limit=None)
else:
tx_nums = self.hashX_history_cache[hashX]
history = '' history = ''
for tx_num in tx_nums: for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_hash, height in new_history: for tx_hash, height in new_history:
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
if history: if history:
@ -1262,11 +1281,7 @@ class BlockchainProcessorService(BlockchainService):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX) existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing: if existing:
self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing) self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None) history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
history += self.mempool.mempool_history(hashX)
if history: if history:
status = sha256(history.encode()) status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
@ -1463,6 +1478,8 @@ class BlockchainProcessorService(BlockchainService):
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.touched_hashXs.clear() self.touched_hashXs.clear()
self.mempool.clear() self.mempool.clear()
self.hashX_history_cache.clear()
self.hashX_full_cache.clear()
def backup_block(self): def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0