use LargestValueCache for caching the largest needed full address histories

This commit is contained in:
Jack Robison 2022-08-26 10:37:48 -04:00
parent fc38bda03c
commit cd9b91e1d9
3 changed files with 125 additions and 6 deletions

View file

@ -7,8 +7,10 @@ import logging
import logging.handlers import logging.handlers
import typing import typing
import collections import collections
from bisect import insort_right
from collections import deque
from decimal import Decimal from decimal import Decimal
from typing import Iterable from typing import Iterable, Deque
from asyncio import get_event_loop, Event from asyncio import get_event_loop, Event
from prometheus_client import Counter from prometheus_client import Counter
from hub.schema.tags import clean_tags from hub.schema.tags import clean_tags
@ -484,6 +486,89 @@ class LFUCacheWithMetrics(LFUCache):
self.misses.inc() self.misses.inc()
class LargestValueCacheItem:
__slots__ = [
'key',
'value',
]
def __init__(self, key, value):
self.key = key
self.value = value
def __gt__(self, other):
return len(self.value) > len(other.value)
def __ge__(self, other):
return len(self.value) >= len(other.value)
def __lt__(self, other):
return len(self.value) < len(other.value)
def __le__(self, other):
return len(self.value) <= len(other.value)
def __eq__(self, other):
return len(self.value) == len(other.value)
class LargestValueCache:
__slots__ = [
'_capacity',
'_cache',
'_raw_cache'
]
def __init__(self, capacity: int):
self._capacity = max(capacity, 0)
self._cache = {}
self._raw_cache: Deque[LargestValueCacheItem] = deque()
def items(self):
return self._cache.items()
def get(self, key, default=None):
return self._cache.get(key, default)
@property
def full(self):
return len(self._cache) >= self._capacity
def set(self, key, value) -> bool:
if self._capacity == 0:
return False
if self.full:
if len(value) < len(self._raw_cache[0].value):
return False
popped = self._raw_cache.popleft()
self._cache.pop(popped.key)
item = LargestValueCacheItem(key, value)
insort_right(self._raw_cache, item)
self._cache[key] = value
return True
def clear(self):
self._cache.clear()
self._raw_cache.clear()
def pop(self, key):
value = self._cache.pop(key)
self._raw_cache.remove(LargestValueCacheItem(key, value))
return value
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self._cache
def __len__(self):
return len(self._cache)
# the ipaddress module does not show these subnets as reserved # the ipaddress module does not show these subnets as reserved
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10') CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24') IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')

View file

@ -62,6 +62,7 @@ class HubServerService(BlockchainReaderService):
def unwind(self): def unwind(self):
self.session_manager.hashX_raw_history_cache.clear() self.session_manager.hashX_raw_history_cache.clear()
self.session_manager.hashX_history_cache.clear()
prev_count = self.db.tx_counts.pop() prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1] tx_count = self.db.tx_counts[-1]
self.db.block_hashes.pop() self.db.block_hashes.pop()

View file

@ -23,7 +23,7 @@ from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from hub.herald.search import SearchIndex from hub.herald.search import SearchIndex
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, asyncify_for_loop from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, asyncify_for_loop
from hub.common import LRUCacheWithMetrics, LFUCacheWithMetrics from hub.common import LRUCacheWithMetrics, LFUCacheWithMetrics, LargestValueCache
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
from hub.herald.framer import NewlineFramer from hub.herald.framer import NewlineFramer
@ -215,20 +215,47 @@ class SessionManager:
self.running = False self.running = False
# hashX: List[int] # hashX: List[int]
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE) self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
# tx_num: CachedAddressHistoryItem # hashX: List[CachedAddressHistoryItem]
self.hashX_history_cache = LargestValueCache(env.hashX_history_cache_size)
# tx_num: Tuple[txid, height]
self.history_tx_info_cache = LFUCacheWithMetrics(env.history_tx_cache_size, metric_name='history_tx', namespace=NAMESPACE) self.history_tx_info_cache = LFUCacheWithMetrics(env.history_tx_cache_size, metric_name='history_tx', namespace=NAMESPACE)
def clear_caches(self): def clear_caches(self):
self.resolve_cache.clear() self.resolve_cache.clear()
def update_history_caches(self, touched_hashXs: typing.List[bytes]): def update_history_caches(self, touched_hashXs: typing.List[bytes]):
# update_history_cache = {} update_history_cache = {}
for hashX in set(touched_hashXs): for hashX in set(touched_hashXs):
# history_tx_nums = None history_tx_nums = None
# if the history is the raw_history_cache, update it # if the history is the raw_history_cache, update it
# TODO: use a reversed iterator for this instead of rescanning it all # TODO: use a reversed iterator for this instead of rescanning it all
if hashX in self.hashX_raw_history_cache: if hashX in self.hashX_raw_history_cache:
self.hashX_raw_history_cache[hashX] = self.db._read_history(hashX, None) self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None)
# if it's in hashX_history_cache, prepare to update it in a batch
if hashX in self.hashX_history_cache:
full_cached = self.hashX_history_cache[hashX]
if history_tx_nums is None:
history_tx_nums = self.db._read_history(hashX, None)
new_txs = history_tx_nums[len(full_cached):]
update_history_cache[hashX] = full_cached, new_txs
if update_history_cache:
# get the set of new tx nums that were touched in all of the new histories to be cached
total_tx_nums = set()
for _, new_txs in update_history_cache.values():
total_tx_nums.update(new_txs)
total_tx_nums = list(total_tx_nums)
# collect the total new tx infos
referenced_new_txs = {
tx_num: (CachedAddressHistoryItem(
tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num)
)) for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums))
}
# update the cached history lists
get_referenced = referenced_new_txs.__getitem__
for hashX, (full, new_txs) in update_history_cache.items():
append_to_full = full.append
for tx_num in new_txs:
append_to_full(get_referenced(tx_num))
async def _start_server(self, kind, *args, **kw_args): async def _start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -626,6 +653,11 @@ class SessionManager:
async def cached_confirmed_history(self, hashX: bytes, async def cached_confirmed_history(self, hashX: bytes,
limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]: limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]:
cached_full_history = self.hashX_history_cache.get(hashX)
# return the cached history
if cached_full_history is not None:
self.address_history_size_metric.observe(len(cached_full_history))
return cached_full_history
# return the history and update the caches # return the history and update the caches
tx_nums = await self._cached_raw_history(hashX, limit) tx_nums = await self._cached_raw_history(hashX, limit)
needed_tx_infos = [] needed_tx_infos = []
@ -652,6 +684,7 @@ class SessionManager:
history_append(tx_infos[tx_num]) history_append(tx_infos[tx_num])
if cnt % 1000 == 0: if cnt % 1000 == 0:
await asyncio.sleep(0) await asyncio.sleep(0)
self.hashX_history_cache[hashX] = history
self.address_history_size_metric.observe(len(history)) self.address_history_size_metric.observe(len(history))
return history return history