From ca1444eb5d68c0a1c7f9d824d10dd5f245d2dd92 Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Tue, 12 Apr 2022 15:27:28 -0400
Subject: [PATCH 1/3] cache cache histories from the db for mempool updates

---
 scribe/blockchain/service.py | 33 +++++++++++++++++++++++++--------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py
index b539ff0..1204213 100644
--- a/scribe/blockchain/service.py
+++ b/scribe/blockchain/service.py
@@ -11,7 +11,7 @@ from scribe import PROMETHEUS_NAMESPACE
 from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
 from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
 from scribe.error.base import ChainError
-from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256
+from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
 from scribe.blockchain.daemon import LBCDaemon
 from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
 from scribe.blockchain.prefetcher import Prefetcher
@@ -125,6 +125,9 @@ class BlockchainProcessorService(BlockchainService):
         self.pending_transaction_num_mapping: Dict[bytes, int] = {}
         self.pending_transactions: Dict[int, bytes] = {}
 
+        self.hashX_history_cache = LRUCache(100)
+        self.hashX_full_cache = LRUCache(100)
+
     async def run_in_thread_with_lock(self, func, *args):
         # Run in a thread to prevent blocking.  Shielded so that
         # cancellations from shutdown don't lose work - when the task
@@ -1244,14 +1247,30 @@ class BlockchainProcessorService(BlockchainService):
         self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
         self.removed_claims_to_send_es.update(self.removed_claim_hashes)
 
+    def _get_cached_hashX_history(self, hashX: bytes) -> str:
+        if hashX in self.hashX_full_cache:
+            return self.hashX_full_cache[hashX]
+        if hashX not in self.hashX_history_cache:
+            self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None)
+        else:
+            tx_nums = self.hashX_history_cache[hashX]
+        history = ''
+        for tx_num in tx_nums:
+            history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
+        self.hashX_full_cache[hashX] = history
+        return history
+
     def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
         existing = self.db.prefix_db.hashX_status.get(hashX)
         if existing:
             self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
-        tx_nums = self.db.read_history(hashX, limit=None)
+        if hashX not in self.hashX_history_cache:
+            tx_nums = self.db.read_history(hashX, limit=None)
+        else:
+            tx_nums = self.hashX_history_cache[hashX]
         history = ''
         for tx_num in tx_nums:
-            history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
+            history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
         for tx_hash, height in new_history:
             history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
         if history:
@@ -1262,11 +1281,7 @@ class BlockchainProcessorService(BlockchainService):
         existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
         if existing:
             self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing)
-        tx_nums = self.db.read_history(hashX, limit=None)
-        history = ''
-        for tx_num in tx_nums:
-            history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
-        history += self.mempool.mempool_history(hashX)
+        history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
         if history:
             status = sha256(history.encode())
             self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
@@ -1463,6 +1478,8 @@ class BlockchainProcessorService(BlockchainService):
         self.pending_support_amount_change.clear()
         self.touched_hashXs.clear()
         self.mempool.clear()
+        self.hashX_history_cache.clear()
+        self.hashX_full_cache.clear()
 
     def backup_block(self):
         assert len(self.db.prefix_db._op_stack) == 0
-- 
2.49.1


From 7c5b02c5a1998d13aeb62db60df213fd9d8b837a Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Tue, 12 Apr 2022 15:39:02 -0400
Subject: [PATCH 2/3] mempool metrics

---
 scribe/hub/mempool.py | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py
index e98b632..7422896 100644
--- a/scribe/hub/mempool.py
+++ b/scribe/hub/mempool.py
@@ -4,7 +4,7 @@ import attr
 import typing
 import logging
 from collections import defaultdict
-from prometheus_client import Histogram
+from prometheus_client import Histogram, Gauge
 import rocksdb.errors
 from scribe import PROMETHEUS_NAMESPACE
 from scribe.common import HISTOGRAM_BUCKETS
@@ -39,6 +39,10 @@ mempool_process_time_metric = Histogram(
     "processed_mempool", "Time to process mempool and notify touched addresses",
     namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
 )
+mempool_tx_count_metric = Gauge("mempool_tx_count", "Transactions in mempool", namespace=NAMESPACE)
+mempool_touched_address_count_metric = Gauge(
+    "mempool_touched_address_count", "Count of addresses touched by transactions in mempool", namespace=NAMESPACE
+)
 
 
 class HubMemPool:
@@ -134,6 +138,9 @@ class HubMemPool:
             for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs):
                 self.touched_hashXs[hashX].add(tx_hash)
                 touched_hashXs.add(hashX)
+
+        mempool_tx_count_metric.set(len(self.txs))
+        mempool_touched_address_count_metric.set(len(self.touched_hashXs))
         return touched_hashXs
 
     def transaction_summaries(self, hashX):
-- 
2.49.1


From a2c046b5134a5c32d6044fc137b64221eb3f0098 Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Tue, 12 Apr 2022 16:24:49 -0400
Subject: [PATCH 3/3] fix touched_hashXs not being cleaned up

---
 scribe/hub/mempool.py | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py
index 7422896..8d7dbd9 100644
--- a/scribe/hub/mempool.py
+++ b/scribe/hub/mempool.py
@@ -52,6 +52,7 @@ class HubMemPool:
         self.logger = logging.getLogger(__name__)
         self.txs = {}
         self.raw_mempool = {}
+        self.tx_touches = {}
         self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set)  # None can be a key
         self.refresh_secs = refresh_secs
         self.mempool_process_time_metric = mempool_process_time_metric
@@ -74,18 +75,18 @@ class HubMemPool:
                 self.logger.error("failed to process mempool, retrying later")
                 return set()
             raise err
-
         # hashXs = self.hashXs  # hashX: [tx_hash, ...]
         touched_hashXs = set()
 
         # Remove txs that aren't in mempool anymore
         for tx_hash in set(self.txs).difference(self.raw_mempool.keys()):
             tx = self.txs.pop(tx_hash)
-            tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs})
+            tx_hashXs = self.tx_touches.pop(tx_hash)
             for hashX in tx_hashXs:
-                if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]:
-                    self.touched_hashXs[hashX].remove(tx_hash)
-                    if not self.touched_hashXs[hashX]:
+                if hashX in self.touched_hashXs:
+                    if tx_hash in self.touched_hashXs[hashX]:
+                        self.touched_hashXs[hashX].remove(tx_hash)
+                    if not len(self.touched_hashXs[hashX]):
                         self.touched_hashXs.pop(hashX)
             touched_hashXs.update(tx_hashXs)
 
@@ -133,11 +134,13 @@ class HubMemPool:
             tx.fee = max(0, (sum(v for _, v in tx.prevouts) -
                              sum(v for _, v in tx.out_pairs)))
             self.txs[tx_hash] = tx
+            self.tx_touches[tx_hash] = tx_touches = set()
             # print(f"added {tx_hash[::-1].hex()} reader to mempool")
 
             for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs):
                 self.touched_hashXs[hashX].add(tx_hash)
                 touched_hashXs.add(hashX)
+                tx_touches.add(hashX)
 
         mempool_tx_count_metric.set(len(self.txs))
         mempool_touched_address_count_metric.set(len(self.touched_hashXs))
-- 
2.49.1