update the history cache in place instead of clearing/rebuilding

This commit is contained in:
Jack Robison 2022-05-21 15:25:07 -04:00
parent 7263ec553e
commit 75e9123eaf
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 75 additions and 34 deletions

View file

@ -1173,7 +1173,7 @@ class HubDB:
raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
def _read_history(self, hashX: bytes, limit: int = 1000) -> List[int]:
def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]:
txs = []
txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
@ -1182,7 +1182,7 @@ class HubDB:
break
return txs
async def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]:
async def read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]:
return await asyncio.get_event_loop().run_in_executor(self._executor, self._read_history, hashX, limit)
async def limited_history(self, hashX, *, limit=1000):

View file

@ -46,9 +46,12 @@ class HubServerService(BlockchainReaderService):
def advance(self, height: int):
super().advance(height)
touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs
self.session_manager.update_history_caches(touched_hashXs)
self.notifications_to_send.append((set(touched_hashXs), height))
def unwind(self):
self.session_manager.hashX_raw_history_cache.clear()
self.session_manager.hashX_history_cache.clear()
prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1]
self.db.headers.pop()

View file

@ -1,5 +1,5 @@
import os
import ssl
import sys
import math
import time
import codecs
@ -23,7 +23,7 @@ from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from hub.herald.search import SearchIndex
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS
from hub.common import LRUCache, LRUCacheWithMetrics, asyncify_for_loop
from hub.common import LRUCacheWithMetrics
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
from hub.herald.framer import NewlineFramer
@ -33,6 +33,8 @@ if typing.TYPE_CHECKING:
from hub.scribe.daemon import LBCDaemon
from hub.herald.mempool import HubMemPool
PYTHON_VERSION = sys.version_info.major, sys.version_info.minor
TypedDict = dict if PYTHON_VERSION < (3, 8) else typing.TypedDict
BAD_REQUEST = 1
DAEMON_ERROR = 2
@ -43,6 +45,11 @@ SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args '
'required_names other_names')
class CachedAddressHistoryItem(TypedDict):
tx_hash: str
height: int
def scripthash_to_hashX(scripthash: str) -> bytes:
try:
bin_hash = hex_str_to_hash(scripthash)
@ -205,16 +212,50 @@ class SessionManager:
elastic_host=env.elastic_host, elastic_port=env.elastic_port
)
self.running = False
self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='raw_history', namespace=NAMESPACE)
self.hashX_full_cache = LRUCacheWithMetrics(2 ** 12, metric_name='full_history', namespace=NAMESPACE)
self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE)
# hashX: List[int]
self.hashX_raw_history_cache = LRUCacheWithMetrics(2 ** 16, metric_name='raw_history', namespace=NAMESPACE)
# hashX: List[CachedAddressHistoryItem]
self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='full_history', namespace=NAMESPACE)
# tx_num: Tuple[txid, height]
self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 19, metric_name='history_tx', namespace=NAMESPACE)
def clear_caches(self):
self.hashX_history_cache.clear()
self.hashX_full_cache.clear()
self.resolve_outputs_cache.clear()
self.resolve_cache.clear()
def update_history_caches(self, touched_hashXs: typing.List[bytes]):
update_history_cache = {}
for hashX in set(touched_hashXs):
history_tx_nums = None
# if the history is the raw_history_cache, update it
# TODO: use a reversed iterator for this instead of rescanning it all
if hashX in self.hashX_raw_history_cache:
self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None)
# if it's in hashX_history_cache, prepare to update it in a batch
if hashX in self.hashX_history_cache:
full_cached = self.hashX_history_cache[hashX]
if history_tx_nums is None:
history_tx_nums = self.db._read_history(hashX, None)
new_txs = history_tx_nums[len(full_cached):]
update_history_cache[hashX] = full_cached, new_txs
if update_history_cache:
# get the set of new tx nums that were touched in all of the new histories to be cached
total_tx_nums = set()
for _, new_txs in update_history_cache.values():
total_tx_nums.update(new_txs)
total_tx_nums = list(total_tx_nums)
# collect the total new tx infos
referenced_new_txs = {
tx_num: (CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num)))
for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums))
}
# update the cached history lists
get_referenced = referenced_new_txs.__getitem__
for hashX, (full, new_txs) in update_history_cache.items():
append_to_full = full.append
for tx_num in new_txs:
append_to_full(get_referenced(tx_num))
async def _start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop()
@ -603,45 +644,47 @@ class SessionManager:
self.txs_sent += 1
return hex_hash
async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]:
cached_full_history = self.hashX_full_cache.get(hashX)
async def _cached_raw_history(self, hashX: bytes, limit: typing.Optional[int] = None):
tx_nums = self.hashX_raw_history_cache.get(hashX)
if tx_nums is None:
self.hashX_raw_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit)
return tx_nums
async def cached_confirmed_history(self, hashX: bytes,
limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]:
cached_full_history = self.hashX_history_cache.get(hashX)
# return the cached history
if cached_full_history is not None:
self.address_history_size_metric.observe(len(cached_full_history))
return cached_full_history
if hashX not in self.hashX_history_cache:
limit = self.env.max_send // 97
self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit)
else:
tx_nums = self.hashX_history_cache[hashX]
self.address_history_size_metric.observe(len(tx_nums))
# return the history and update the caches
tx_nums = await self._cached_raw_history(hashX, limit)
needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append
tx_infos = {}
cnt = 0
for tx_num in tx_nums:
for cnt, tx_num in enumerate(tx_nums): # determine which tx_hashes are cached and which we need to look up
cached = self.history_tx_info_cache.get(tx_num)
if cached is not None:
tx_infos[tx_num] = cached
else:
append_needed_tx_info(tx_num)
cnt += 1
if cnt % 1000 == 0:
await asyncio.sleep(0)
if needed_tx_infos:
for tx_num, tx_hash in zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos)):
hist = tx_hash[::-1].hex(), bisect_right(self.db.tx_counts, tx_num)
if needed_tx_infos: # request all the needed tx hashes in one batch, cache the txids and heights
for cnt, (tx_num, tx_hash) in enumerate(zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos))):
hist = CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num))
tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = hist
cnt += 1
if cnt % 1000 == 0:
await asyncio.sleep(0)
# ensure the ordering of the txs
history = []
history_append = history.append
for tx_num in tx_nums:
for cnt, tx_num in enumerate(tx_nums):
history_append(tx_infos[tx_num])
self.hashX_full_cache[hashX] = history
cnt += 1
if cnt % 1000 == 0:
await asyncio.sleep(0)
self.hashX_history_cache[hashX] = history
self.address_history_size_metric.observe(len(history))
return history
def _notify_peer(self, peer):
@ -1463,13 +1506,8 @@ class LBRYElectrumX(asyncio.Protocol):
async def confirmed_and_unconfirmed_history(self, hashX):
# Note history is ordered but unconfirmed is unordered in e-s
history = await self.session_manager.limited_history(hashX)
conf = [
item async for item in asyncify_for_loop(
({'tx_hash': txid, 'height': height} for txid, height in history), 1000
)
]
return conf + self.unconfirmed_history(hashX)
history = await self.session_manager.cached_confirmed_history(hashX)
return history + self.unconfirmed_history(hashX)
async def scripthash_get_history(self, scripthash):
"""Return the confirmed and unconfirmed history of a scripthash."""