From b7de08ba0b198071fb6796ad1f14aba1ab80e8e3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 29 Dec 2022 10:04:11 -0500 Subject: [PATCH] add `ResumableSHA256` and `HashXHistoryHasherPrefixRow` column family --- hub/db/common.py | 1 + hub/db/prefixes.py | 42 ++++++++++++++++++++ hub/scribe/db.py | 32 +++++++++------ hub/scribe/service.py | 90 +++++++++++++++++++++++-------------------- setup.py | 3 +- 5 files changed, 114 insertions(+), 54 deletions(-) diff --git a/hub/db/common.py b/hub/db/common.py index 1f01736..9869f5a 100644 --- a/hub/db/common.py +++ b/hub/db/common.py @@ -51,6 +51,7 @@ class DB_PREFIXES(enum.Enum): reposted_count = b'j' effective_amount = b'i' future_effective_amount = b'k' + hashX_history_hash = b'l' COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass diff --git a/hub/db/prefixes.py b/hub/db/prefixes.py index 2b864f9..44abe1b 100644 --- a/hub/db/prefixes.py +++ b/hub/db/prefixes.py @@ -3,6 +3,7 @@ import struct import array import base64 from typing import Union, Tuple, NamedTuple, Optional +from hub.common import ResumableSHA256 from hub.db.common import DB_PREFIXES from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow from hub.schema.url import normalize_name @@ -1851,6 +1852,46 @@ class FutureEffectiveAmountPrefixRow(PrefixRow): return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount) +class HashXHistoryHasherKey(NamedTuple): + hashX: bytes + + +class HashXHistoryHasherValue(NamedTuple): + hasher: ResumableSHA256 + + +class HashXHistoryHasherPrefixRow(PrefixRow): + prefix = DB_PREFIXES.hashX_history_hash.value + key_struct = struct.Struct(b'>11s') + value_struct = struct.Struct(b'>120s') + cache_size = 1024 * 1024 * 64 + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>11s').pack + ] + + @classmethod + def pack_key(cls, hashX: bytes): + return super().pack_key(hashX) + + @classmethod + def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey: + return HashXHistoryHasherKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, hasher: ResumableSHA256) -> bytes: + return super().pack_value(hasher.get_state()) + + @classmethod + def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue: + return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data))) + + @classmethod + def pack_item(cls, hashX: bytes, hasher: ResumableSHA256): + return cls.pack_key(hashX), cls.pack_value(hasher) + + class PrefixDB(BasePrefixDB): def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None, @@ -1897,6 +1938,7 @@ class PrefixDB(BasePrefixDB): self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack) self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack) self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack) + self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack) def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: diff --git a/hub/scribe/db.py b/hub/scribe/db.py index 6561731..f47259e 100644 --- a/hub/scribe/db.py +++ b/hub/scribe/db.py @@ -5,7 +5,7 @@ import time from typing import List from concurrent.futures.thread import ThreadPoolExecutor from bisect import bisect_right -from hub.common import sha256 +from hub.common import ResumableSHA256 from hub.db import SecondaryDB @@ -35,16 +35,19 @@ class PrimaryDB(SecondaryDB): if last_hashX: yield last_hashX - def hashX_status_from_history(history: bytes) -> bytes: + def hashX_status_from_history(history: bytes) -> ResumableSHA256: tx_counts = self.tx_counts hist_tx_nums = array.array('I') hist_tx_nums.frombytes(history) - digest = hashlib.sha256() - for tx_num, tx_hash in zip( + digest = ResumableSHA256() + digest.update( + b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode() + for tx_num, tx_hash in zip( hist_tx_nums, - self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)): - digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()) - return digest.digest() + self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False) + )) + ) + return digest start = time.perf_counter() @@ -67,17 +70,24 @@ class PrimaryDB(SecondaryDB): hashX_cnt += 1 key = prefix_db.hashX_status.pack_key(hashX) history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) - status = hashX_status_from_history(history) + digester = hashX_status_from_history(history) + status = digester.digest() existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) - if existing_status and existing_status == status: - continue - elif not existing_status: + existing_digester = prefix_db.hashX_history_hasher.get(hashX) + if not existing_status: prefix_db.stash_raw_put(key, status) op_cnt += 1 else: prefix_db.stash_raw_delete(key, existing_status) prefix_db.stash_raw_put(key, status) op_cnt += 2 + if not existing_digester: + prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,)) + op_cnt += 1 + else: + prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester) + prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,)) + op_cnt += 2 if op_cnt > 100000: prefix_db.unsafe_commit() self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") diff --git a/hub/scribe/service.py b/hub/scribe/service.py index b077181..d3f9f57 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -1,3 +1,4 @@ +import rehash import time import asyncio import typing @@ -11,7 +12,8 @@ from hub import PROMETHEUS_NAMESPACE from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.error.base import ChainError -from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics +from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache +from hub.common import ResumableSHA256, LFUCacheWithMetrics from hub.scribe.db import PrimaryDB from hub.scribe.daemon import LBCDaemon from hub.scribe.transaction import Tx, TxOutput, TxInput, Block @@ -169,10 +171,22 @@ class BlockchainProcessorService(BlockchainService): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): self.mempool.remove(to_delete) - touched_hashXs = self.mempool.update_mempool(to_put) + touched_hashXs = list(self.mempool.update_mempool(to_put)) if self.env.index_address_status: - for hashX in touched_hashXs: - self._get_update_hashX_mempool_status_ops(hashX) + status_hashers = { + k: v.hasher if v else ResumableSHA256() for k, v in zip( + touched_hashXs, + self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs]) + ) + } + for hashX, v in zip( + touched_hashXs, + self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])): + if v is not None: + self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v) + hasher = status_hashers[hashX] + hasher.update(self.mempool.mempool_history(hashX).encode()) + self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),)) for tx_hash, raw_tx in to_put: mempool_prefix.stash_put((tx_hash,), (raw_tx,)) for tx_hash, raw_tx in to_delete.items(): @@ -1637,15 +1651,6 @@ class BlockchainProcessorService(BlockchainService): self.hashX_full_cache[hashX] = history return history - def _get_update_hashX_mempool_status_ops(self, hashX: bytes): - existing = self.db.prefix_db.hashX_mempool_status.get(hashX) - if existing: - self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing) - history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX) - if history: - status = sha256(history.encode()) - self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,)) - def advance_block(self, block: Block): txo_count = 0 txi_count = 0 @@ -1745,9 +1750,7 @@ class BlockchainProcessorService(BlockchainService): # update hashX history status hashes and compactify the histories self._get_update_hashX_histories_ops(height) - # only compactify adddress histories and update the status index if we're already caught up, - # a bulk update will happen once catchup finishes - if not self.db.catching_up and self.env.index_address_status: + if self.env.index_address_status: self._get_compactify_ops(height) self.db.last_indexed_address_status_height = height @@ -1802,6 +1805,17 @@ class BlockchainProcessorService(BlockchainService): ) def _get_compactify_ops(self, height: int): + def _rebuild_hasher(hist_tx_nums): + hasher = ResumableSHA256() + hasher.update( + b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode() + for tx_num, tx_hash in zip( + hist_tx_nums, + self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False) + )) + ) + return hasher + existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False) if existing_hashX_statuses: pack_key = self.db.prefix_db.hashX_status.pack_key @@ -1816,6 +1830,13 @@ class BlockchainProcessorService(BlockchainService): append_deletes_hashX_history = block_hashX_history_deletes.append block_hashX_history_puts = [] + existing_status_hashers = { + k: v.hasher if v else None for k, v in zip( + self.hashXs_by_tx, + self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx]) + ) + } + for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses): new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums] @@ -1830,11 +1851,9 @@ class BlockchainProcessorService(BlockchainService): unpack_key = self.db.prefix_db.hashX_history.unpack_key needs_compaction = False - total_hist_txs = b'' for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False, deserialize_value=False): hist_txs = unpack_history(hist) - total_hist_txs += hist txs_extend(hist_txs) hist_height = unpack_key(k).height if height > reorg_limit and hist_height < height - reorg_limit: @@ -1853,27 +1872,17 @@ class BlockchainProcessorService(BlockchainService): block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,))) if not new_history: continue - - needed_tx_infos = [] - append_needed_tx_info = needed_tx_infos.append - tx_infos = {} - for tx_num in tx_nums: - cached_tx_info = self.history_tx_info_cache.get(tx_num) - if cached_tx_info is not None: - tx_infos[tx_num] = cached_tx_info - else: - append_needed_tx_info(tx_num) - if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): - tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' - tx_infos[tx_num] = tx_info - self.history_tx_info_cache[tx_num] = tx_info - history = ''.join(map(tx_infos.__getitem__, tx_nums)) - for tx_hash, height in new_history: - history += f'{tx_hash[::-1].hex()}:{height:d}:' - if history: - status = sha256(history.encode()) - self.db.prefix_db.hashX_status.stash_put((hashX,), (status,)) + hasher = existing_status_hashers[hashX] + if hasher is None: + hasher = _rebuild_hasher(tx_nums) + else: + self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,)) + hasher.update(b''.join( + f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history + )) + self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,)) + status = hasher.digest() + self.db.prefix_db.hashX_status.stash_put((hashX,), (status,)) self.db.prefix_db.multi_delete(block_hashX_history_deletes) self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts) @@ -2135,9 +2144,6 @@ class BlockchainProcessorService(BlockchainService): async def _finished_initial_catch_up(self): self.log.info(f'caught up to height {self.height}') - if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height: - await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height) - # Flush everything but with catching_up->False state. self.db.catching_up = False diff --git a/setup.py b/setup.py index a61059f..71a198e 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,8 @@ setup( 'filetype==1.0.9', 'grpcio==1.38.0', 'lbry-rocksdb==0.8.2', - 'ujson==5.4.0' + 'ujson==5.4.0', + 'rehash==1.0.0' ], extras_require={ 'lint': ['pylint==2.10.0'],