add address history largest-value-cache

This commit is contained in:
Jack Robison 2022-05-03 16:36:35 -04:00
parent 3b7850802a
commit 114f8a393e
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 129 additions and 2 deletions

View file

@ -5,6 +5,7 @@ import logging
import logging.handlers
import typing
import collections
from bisect import bisect_right
from asyncio import get_event_loop, Event
from prometheus_client import Counter
@ -253,6 +254,78 @@ class LRUCache:
self.clear()
class LargestValueCache:
__slots__ = [
'_capacity',
'_cache',
'_sizes',
'_keys'
]
def __init__(self, capacity: int):
self._capacity = capacity
self._cache = {}
self._keys = []
self._sizes = []
def items(self):
return self._cache.items()
def get(self, key, default=None):
return self._cache.get(key, default)
@property
def full(self):
return len(self._cache) >= self._capacity
def set(self, key, value) -> bool:
if not self.full:
idx = len(self._sizes) - bisect_right(list(reversed(self._sizes)), len(value))
self._sizes.insert(idx, len(value))
self._keys.insert(idx, key)
self._cache[key] = value
return True
elif key in self._cache: # item is already cached, update it
existing = self._keys.index(key)
if len(value) != self._sizes[existing]:
self._keys.pop(existing)
self._sizes.pop(existing)
idx = len(self._sizes) - bisect_right(list(reversed(self._sizes)), len(value))
self._sizes.insert(idx, len(value))
self._keys.insert(idx, key)
self._cache[key] = value
return True
elif len(value) > self._sizes[-1]:
self._sizes.pop()
self._cache.pop(self._keys.pop())
idx = len(self._sizes) - bisect_right(list(reversed(self._sizes)), len(value))
self._sizes.insert(idx, len(value))
self._keys.insert(idx, key)
self._cache[key] = value
return True
return False
def clear(self):
self._cache.clear()
self._sizes.clear()
self._keys.clear()
def pop(self, key, default=None):
return self._cache.pop(key, default)
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self._cache
def __len__(self):
return len(self._cache)
# the ipaddress module does not show these subnets as reserved
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')

View file

@ -1,12 +1,14 @@
import time
import typing
import asyncio
from bisect import bisect_right
from scribe.blockchain.daemon import LBCDaemon
from scribe.hub.session import SessionManager
from scribe.hub.mempool import HubMemPool
from scribe.hub.udp import StatusServer
from scribe.service import BlockchainReaderService
from scribe.elasticsearch import ElasticNotifierClientProtocol
from scribe.common import LargestValueCache
if typing.TYPE_CHECKING:
from scribe.hub.env import ServerEnv
@ -20,8 +22,10 @@ class HubServerService(BlockchainReaderService):
self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs
self.mempool = HubMemPool(self.env.coin, self.db)
self.large_history_cache = LargestValueCache(10000) # stores histories as lists of tx nums
self.large_full_history_cache = LargestValueCache(10000) # stores serialized histories
self.session_manager = SessionManager(
env, self.db, self.mempool, self.daemon,
env, self.db, self.mempool, self.daemon, self.large_full_history_cache,
self.shutdown_event,
on_available_callback=self.status_server.set_available,
on_unavailable_callback=self.status_server.set_unavailable
@ -43,9 +47,36 @@ class HubServerService(BlockchainReaderService):
def clear_search_cache(self):
self.session_manager.search_index.clear_caches()
def _reorg_detected(self):
self.large_history_cache.clear() # TODO: these could be pruned instead of cleared
self.large_full_history_cache.clear()
def advance(self, height: int):
super().advance(height)
touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs
# update cached address histories
for hashX in touched_hashXs:
history = self.large_history_cache.get(hashX) or []
extend_history = history.extend
full_history = self.large_full_history_cache.get(hashX) or []
start_height = 0
if full_history:
start_height = full_history[-1][1] + 1
for hist in self.db.prefix_db.hashX_history.iterate(start=(hashX, start_height),
stop=(hashX, height + 1), include_key=False):
extend_history(hist)
if not history:
continue
if self.large_history_cache.set(hashX, list(history)): # only the largest will be cached:
precached_offset = len(full_history or [])
needed_history = history[precached_offset:]
if needed_history:
append_full_history = full_history.append
for tx_num, tx_hash in zip(needed_history, self.db.get_tx_hashes(needed_history)):
append_full_history((tx_hash, bisect_right(self.db.tx_counts, tx_num)))
self.large_full_history_cache.set(hashX, full_history)
self.notifications_to_send.append((set(touched_hashXs), height))
def _detect_changes(self):
@ -94,7 +125,23 @@ class HubServerService(BlockchainReaderService):
self.env.host, self.env.udp_port, self.env.allow_lan_udp
)
def populate_largest_histories_cache(self):
self.log.info("populating address history cache")
for hashX, history in self.db.prefix_db.hashX_history.iterate():
self.large_history_cache[hashX] = history
self.log.info("sorted %i largest address histories", len(self.large_history_cache))
for hashX, history in self.large_history_cache.items():
full_history = []
append_full_history = full_history.append
for tx_num, tx_hash in zip(history, self.db.get_tx_hashes(history)):
append_full_history((tx_hash, bisect_right(self.db.tx_counts, tx_num)))
self.large_full_history_cache.set(hashX, full_history)
self.log.info("cached %i largest address histories ranging from %i to %i txs in length",
len(self.large_history_cache), self.large_history_cache._sizes[-1],
self.large_history_cache._sizes[0])
def _iter_start_tasks(self):
# self.populate_largest_histories_cache()
yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.mempool.send_notifications_forever)

View file

@ -165,7 +165,7 @@ class SessionManager:
)
def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
daemon: 'LBCDaemon', large_history_cache, shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send)
self.env = env
@ -184,6 +184,7 @@ class SessionManager:
self.txs_sent = 0
self.start_time = time.time()
self.history_cache = {}
self.largest_history_cache = large_history_cache
self.resolve_outputs_cache = {}
self.resolve_cache = {}
self.notified_height: typing.Optional[int] = None
@ -594,6 +595,8 @@ class SessionManager:
async def limited_history(self, hashX):
"""A caching layer."""
if hashX in self.largest_history_cache:
return self.largest_history_cache[hashX]
if hashX not in self.history_cache:
# History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage

View file

@ -186,6 +186,9 @@ class BlockchainReaderService(BlockchainService):
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
self.db.merkle_cache.clear()
def _reorg_detected(self):
pass
def _detect_changes(self):
try:
self.db.prefix_db.try_catch_up_with_primary()
@ -196,6 +199,7 @@ class BlockchainReaderService(BlockchainService):
if not state or state.height <= 0:
return
if self.last_state and self.last_state.height > state.height:
self._reorg_detected()
self.log.warning("reorg detected, waiting until the writer has flushed the new blocks to advance")
return
last_height = 0 if not self.last_state else self.last_state.height