Merge pull request #22 from lbryio/hashX-history-cache

cache histories from the db for mempool updates
This commit is contained in:
Jack Robison 2022-04-12 18:33:38 -04:00 committed by GitHub
commit ce50da9abe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 14 deletions

View file

@ -11,7 +11,7 @@ from scribe import PROMETHEUS_NAMESPACE
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.error.base import ChainError from scribe.error.base import ChainError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256 from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher from scribe.blockchain.prefetcher import Prefetcher
@ -125,6 +125,9 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {} self.pending_transactions: Dict[int, bytes] = {}
self.hashX_history_cache = LRUCache(100)
self.hashX_full_cache = LRUCache(100)
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -1244,14 +1247,30 @@ class BlockchainProcessorService(BlockchainService):
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes)
def _get_cached_hashX_history(self, hashX: bytes) -> str:
if hashX in self.hashX_full_cache:
return self.hashX_full_cache[hashX]
if hashX not in self.hashX_history_cache:
self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None)
else:
tx_nums = self.hashX_history_cache[hashX]
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
self.hashX_full_cache[hashX] = history
return history
def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]): def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
existing = self.db.prefix_db.hashX_status.get(hashX) existing = self.db.prefix_db.hashX_status.get(hashX)
if existing: if existing:
self.db.prefix_db.hashX_status.stage_delete((hashX,), existing) self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None) if hashX not in self.hashX_history_cache:
tx_nums = self.db.read_history(hashX, limit=None)
else:
tx_nums = self.hashX_history_cache[hashX]
history = '' history = ''
for tx_num in tx_nums: for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_hash, height in new_history: for tx_hash, height in new_history:
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
if history: if history:
@ -1262,11 +1281,7 @@ class BlockchainProcessorService(BlockchainService):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX) existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing: if existing:
self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing) self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None) history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
history += self.mempool.mempool_history(hashX)
if history: if history:
status = sha256(history.encode()) status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
@ -1463,6 +1478,8 @@ class BlockchainProcessorService(BlockchainService):
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.touched_hashXs.clear() self.touched_hashXs.clear()
self.mempool.clear() self.mempool.clear()
self.hashX_history_cache.clear()
self.hashX_full_cache.clear()
def backup_block(self): def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0

View file

@ -4,7 +4,7 @@ import attr
import typing import typing
import logging import logging
from collections import defaultdict from collections import defaultdict
from prometheus_client import Histogram from prometheus_client import Histogram, Gauge
import rocksdb.errors import rocksdb.errors
from scribe import PROMETHEUS_NAMESPACE from scribe import PROMETHEUS_NAMESPACE
from scribe.common import HISTOGRAM_BUCKETS from scribe.common import HISTOGRAM_BUCKETS
@ -39,6 +39,10 @@ mempool_process_time_metric = Histogram(
"processed_mempool", "Time to process mempool and notify touched addresses", "processed_mempool", "Time to process mempool and notify touched addresses",
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
mempool_tx_count_metric = Gauge("mempool_tx_count", "Transactions in mempool", namespace=NAMESPACE)
mempool_touched_address_count_metric = Gauge(
"mempool_touched_address_count", "Count of addresses touched by transactions in mempool", namespace=NAMESPACE
)
class HubMemPool: class HubMemPool:
@ -48,6 +52,7 @@ class HubMemPool:
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.txs = {} self.txs = {}
self.raw_mempool = {} self.raw_mempool = {}
self.tx_touches = {}
self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key
self.refresh_secs = refresh_secs self.refresh_secs = refresh_secs
self.mempool_process_time_metric = mempool_process_time_metric self.mempool_process_time_metric = mempool_process_time_metric
@ -70,18 +75,18 @@ class HubMemPool:
self.logger.error("failed to process mempool, retrying later") self.logger.error("failed to process mempool, retrying later")
return set() return set()
raise err raise err
# hashXs = self.hashXs # hashX: [tx_hash, ...] # hashXs = self.hashXs # hashX: [tx_hash, ...]
touched_hashXs = set() touched_hashXs = set()
# Remove txs that aren't in mempool anymore # Remove txs that aren't in mempool anymore
for tx_hash in set(self.txs).difference(self.raw_mempool.keys()): for tx_hash in set(self.txs).difference(self.raw_mempool.keys()):
tx = self.txs.pop(tx_hash) tx = self.txs.pop(tx_hash)
tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs}) tx_hashXs = self.tx_touches.pop(tx_hash)
for hashX in tx_hashXs: for hashX in tx_hashXs:
if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]: if hashX in self.touched_hashXs:
self.touched_hashXs[hashX].remove(tx_hash) if tx_hash in self.touched_hashXs[hashX]:
if not self.touched_hashXs[hashX]: self.touched_hashXs[hashX].remove(tx_hash)
if not len(self.touched_hashXs[hashX]):
self.touched_hashXs.pop(hashX) self.touched_hashXs.pop(hashX)
touched_hashXs.update(tx_hashXs) touched_hashXs.update(tx_hashXs)
@ -129,11 +134,16 @@ class HubMemPool:
tx.fee = max(0, (sum(v for _, v in tx.prevouts) - tx.fee = max(0, (sum(v for _, v in tx.prevouts) -
sum(v for _, v in tx.out_pairs))) sum(v for _, v in tx.out_pairs)))
self.txs[tx_hash] = tx self.txs[tx_hash] = tx
self.tx_touches[tx_hash] = tx_touches = set()
# print(f"added {tx_hash[::-1].hex()} reader to mempool") # print(f"added {tx_hash[::-1].hex()} reader to mempool")
for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs): for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs):
self.touched_hashXs[hashX].add(tx_hash) self.touched_hashXs[hashX].add(tx_hash)
touched_hashXs.add(hashX) touched_hashXs.add(hashX)
tx_touches.add(hashX)
mempool_tx_count_metric.set(len(self.txs))
mempool_touched_address_count_metric.set(len(self.touched_hashXs))
return touched_hashXs return touched_hashXs
def transaction_summaries(self, hashX): def transaction_summaries(self, hashX):