add ResumableSHA256 and HashXHistoryHasherPrefixRow column family

This commit is contained in:
Jack Robison 2022-12-29 10:04:11 -05:00
parent 405cef8d28
commit b7de08ba0b
5 changed files with 114 additions and 54 deletions

View file

@ -51,6 +51,7 @@ class DB_PREFIXES(enum.Enum):
reposted_count = b'j'
effective_amount = b'i'
future_effective_amount = b'k'
hashX_history_hash = b'l'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -3,6 +3,7 @@ import struct
import array
import base64
from typing import Union, Tuple, NamedTuple, Optional
from hub.common import ResumableSHA256
from hub.db.common import DB_PREFIXES
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
from hub.schema.url import normalize_name
@ -1851,6 +1852,46 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
class HashXHistoryHasherKey(NamedTuple):
hashX: bytes
class HashXHistoryHasherValue(NamedTuple):
hasher: ResumableSHA256
class HashXHistoryHasherPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_history_hash.value
key_struct = struct.Struct(b'>11s')
value_struct = struct.Struct(b'>120s')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>11s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
return HashXHistoryHasherKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
return super().pack_value(hasher.get_state())
@classmethod
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
@classmethod
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
return cls.pack_key(hashX), cls.pack_value(hasher)
class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
@ -1897,6 +1938,7 @@ class PrefixDB(BasePrefixDB):
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -5,7 +5,7 @@ import time
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right
from hub.common import sha256
from hub.common import ResumableSHA256
from hub.db import SecondaryDB
@ -35,16 +35,19 @@ class PrimaryDB(SecondaryDB):
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
def hashX_status_from_history(history: bytes) -> ResumableSHA256:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
digest = hashlib.sha256()
for tx_num, tx_hash in zip(
digest = ResumableSHA256()
digest.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)):
digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode())
return digest.digest()
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return digest
start = time.perf_counter()
@ -67,17 +70,24 @@ class PrimaryDB(SecondaryDB):
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
digester = hashX_status_from_history(history)
status = digester.digest()
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
existing_digester = prefix_db.hashX_history_hasher.get(hashX)
if not existing_status:
prefix_db.stash_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stash_raw_delete(key, existing_status)
prefix_db.stash_raw_put(key, status)
op_cnt += 2
if not existing_digester:
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 1
else:
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")

View file

@ -1,3 +1,4 @@
import rehash
import time
import asyncio
import typing
@ -11,7 +12,8 @@ from hub import PROMETHEUS_NAMESPACE
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache
from hub.common import ResumableSHA256, LFUCacheWithMetrics
from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
@ -169,10 +171,22 @@ class BlockchainProcessorService(BlockchainService):
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
touched_hashXs = list(self.mempool.update_mempool(to_put))
if self.env.index_address_status:
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
status_hashers = {
k: v.hasher if v else ResumableSHA256() for k, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
)
}
for hashX, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
if v is not None:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
hasher = status_hashers[hashX]
hasher.update(self.mempool.mempool_history(hashX).encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
for tx_hash, raw_tx in to_put:
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items():
@ -1637,15 +1651,6 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache[hashX] = history
return history
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block):
txo_count = 0
txi_count = 0
@ -1745,9 +1750,7 @@ class BlockchainProcessorService(BlockchainService):
# update hashX history status hashes and compactify the histories
self._get_update_hashX_histories_ops(height)
# only compactify adddress histories and update the status index if we're already caught up,
# a bulk update will happen once catchup finishes
if not self.db.catching_up and self.env.index_address_status:
if self.env.index_address_status:
self._get_compactify_ops(height)
self.db.last_indexed_address_status_height = height
@ -1802,6 +1805,17 @@ class BlockchainProcessorService(BlockchainService):
)
def _get_compactify_ops(self, height: int):
def _rebuild_hasher(hist_tx_nums):
hasher = ResumableSHA256()
hasher.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return hasher
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
if existing_hashX_statuses:
pack_key = self.db.prefix_db.hashX_status.pack_key
@ -1816,6 +1830,13 @@ class BlockchainProcessorService(BlockchainService):
append_deletes_hashX_history = block_hashX_history_deletes.append
block_hashX_history_puts = []
existing_status_hashers = {
k: v.hasher if v else None for k, v in zip(
self.hashXs_by_tx,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
)
}
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
@ -1830,11 +1851,9 @@ class BlockchainProcessorService(BlockchainService):
unpack_key = self.db.prefix_db.hashX_history.unpack_key
needs_compaction = False
total_hist_txs = b''
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
deserialize_value=False):
hist_txs = unpack_history(hist)
total_hist_txs += hist
txs_extend(hist_txs)
hist_height = unpack_key(k).height
if height > reorg_limit and hist_height < height - reorg_limit:
@ -1853,27 +1872,17 @@ class BlockchainProcessorService(BlockchainService):
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
if not new_history:
continue
needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append
tx_infos = {}
for tx_num in tx_nums:
cached_tx_info = self.history_tx_info_cache.get(tx_num)
if cached_tx_info is not None:
tx_infos[tx_num] = cached_tx_info
else:
append_needed_tx_info(tx_num)
if needed_tx_infos:
for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)):
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
tx_infos[tx_num] = tx_info
self.history_tx_info_cache[tx_num] = tx_info
history = ''.join(map(tx_infos.__getitem__, tx_nums))
for tx_hash, height in new_history:
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
hasher = existing_status_hashers[hashX]
if hasher is None:
hasher = _rebuild_hasher(tx_nums)
else:
self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,))
hasher.update(b''.join(
f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history
))
self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,))
status = hasher.digest()
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
@ -2135,9 +2144,6 @@ class BlockchainProcessorService(BlockchainService):
async def _finished_initial_catch_up(self):
self.log.info(f'caught up to height {self.height}')
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
# Flush everything but with catching_up->False state.
self.db.catching_up = False

View file

@ -44,7 +44,8 @@ setup(
'filetype==1.0.9',
'grpcio==1.38.0',
'lbry-rocksdb==0.8.2',
'ujson==5.4.0'
'ujson==5.4.0',
'rehash==1.0.0'
],
extras_require={
'lint': ['pylint==2.10.0'],