From a48564e3b2a73877e932947be27f96c579ded8a4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 16 Oct 2022 14:14:29 -0400 Subject: [PATCH] batched db integrity checks --- hub/db/interface.py | 58 ++++++++++------- hub/db/revertable.py | 84 ++++++++++++++++++++++--- hub/scribe/db.py | 16 ++--- hub/scribe/service.py | 143 +++++++++++++++++++++--------------------- 4 files changed, 187 insertions(+), 114 deletions(-) diff --git a/hub/db/interface.py b/hub/db/interface.py index de5bab3..9a2895c 100644 --- a/hub/db/interface.py +++ b/hub/db/interface.py @@ -89,6 +89,12 @@ class PrefixRow(metaclass=PrefixRowType): if v: return v if not deserialize_value else self.unpack_value(v) + def key_exists(self, *key_args): + key_may_exist, _ = self._db.key_may_exist((self._column_family, self.pack_key(*key_args))) + if not key_may_exist: + return False + return self._db.get((self._column_family, self.pack_key(*key_args)), fill_cache=True) is not None + def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True): packed_keys = {tuple(args): self.pack_key(*args) for args in key_args} db_result = self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args], @@ -118,26 +124,28 @@ class PrefixRow(metaclass=PrefixRowType): if idx % step == 0: await asyncio.sleep(0) - def stage_multi_put(self, items): - self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items]) + def stash_multi_put(self, items): + self._op_stack.stash_ops([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items]) + + def stash_multi_delete(self, items): + self._op_stack.stash_ops([RevertableDelete(self.pack_key(*k), self.pack_value(*v)) for k, v in items]) def get_pending(self, *key_args, fill_cache=True, deserialize_value=True): packed_key = self.pack_key(*key_args) - last_op = self._op_stack.get_last_op_for_key(packed_key) - if last_op: - if last_op.is_put: - return last_op.value if not deserialize_value else self.unpack_value(last_op.value) - else: # it's a delete - return - v = self._db.get((self._column_family, packed_key), fill_cache=fill_cache) - if v: - return v if not deserialize_value else self.unpack_value(v) + pending_op = self._op_stack.get_pending_op(packed_key) + if pending_op and pending_op.is_delete: + return + if pending_op: + v = pending_op.value + else: + v = self._db.get((self._column_family, packed_key), fill_cache=fill_cache) + return None if v is None else (v if not deserialize_value else self.unpack_value(v)) - def stage_put(self, key_args=(), value_args=()): - self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))) + def stash_put(self, key_args=(), value_args=()): + self._op_stack.stash_ops([RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))]) - def stage_delete(self, key_args=(), value_args=()): - self._op_stack.append_op(RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args))) + def stash_delete(self, key_args=(), value_args=()): + self._op_stack.stash_ops([RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args))]) @classmethod def pack_partial_key(cls, *args) -> bytes: @@ -181,7 +189,7 @@ class BasePrefixDB: settings = COLUMN_SETTINGS[prefix.value] column_family_options[prefix.value] = rocksdb.ColumnFamilyOptions() column_family_options[prefix.value].table_factory = rocksdb.BlockBasedTableFactory( - block_cache=rocksdb.LRUCache(settings['cache_size']), + block_cache=rocksdb.LRUCache(settings['cache_size']) ) self.column_families: typing.Dict[bytes, 'rocksdb.ColumnFamilyHandle'] = {} options = rocksdb.Options( @@ -206,6 +214,7 @@ class BasePrefixDB: Write staged changes to the database without keeping undo information Changes written cannot be undone """ + self.apply_stash() try: if not len(self._op_stack): return @@ -226,6 +235,7 @@ class BasePrefixDB: """ Write changes for a block height to the database and keep undo information so that the changes can be reverted """ + self.apply_stash() undo_ops = self._op_stack.get_undo_ops() delete_undos = [] if height > self._max_undo_depth: @@ -275,6 +285,9 @@ class BasePrefixDB: finally: self._op_stack.clear() + def apply_stash(self): + self._op_stack.validate_and_apply_stashed_ops() + def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: cf = self.column_families[key[:1]] return self._db.get((cf, key), fill_cache=fill_cache) @@ -282,16 +295,15 @@ class BasePrefixDB: def multi_get(self, keys: typing.List[bytes], fill_cache=True): if len(keys) == 0: return [] - first_key = keys[0] get_cf = self.column_families.__getitem__ db_result = self._db.multi_get([(get_cf(k[:1]), k) for k in keys], fill_cache=fill_cache) return list(db_result.values()) def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]): - self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items]) + self._op_stack.stash_ops([RevertableDelete(k, v) for k, v in items]) def multi_put(self, items: typing.List[typing.Tuple[bytes, bytes]]): - self._op_stack.multi_put([RevertablePut(k, v) for k, v in items]) + self._op_stack.stash_ops([RevertablePut(k, v) for k, v in items]) def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None, iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None, @@ -310,11 +322,11 @@ class BasePrefixDB: def try_catch_up_with_primary(self): self._db.try_catch_up_with_primary() - def stage_raw_put(self, key: bytes, value: bytes): - self._op_stack.append_op(RevertablePut(key, value)) + def stash_raw_put(self, key: bytes, value: bytes): + self._op_stack.stash_ops([RevertablePut(key, value)]) - def stage_raw_delete(self, key: bytes, value: bytes): - self._op_stack.append_op(RevertableDelete(key, value)) + def stash_raw_delete(self, key: bytes, value: bytes): + self._op_stack.stash_ops([RevertableDelete(key, value)]) def estimate_num_keys(self, column_family: 'rocksdb.ColumnFamilyHandle' = None): return int(self._db.get_property(b'rocksdb.estimate-num-keys', column_family).decode()) diff --git a/hub/db/revertable.py b/hub/db/revertable.py index 37fbc4e..9e66a27 100644 --- a/hub/db/revertable.py +++ b/hub/db/revertable.py @@ -1,8 +1,8 @@ import struct import logging from string import printable -from collections import defaultdict -from typing import Tuple, Iterable, Callable, Optional, List +from collections import defaultdict, deque +from typing import Tuple, Iterable, Callable, Optional, List, Deque from hub.db.common import DB_PREFIXES _OP_STRUCT = struct.Struct('>BLL') @@ -97,9 +97,76 @@ class RevertableOpStack: """ self._get = get_fn self._multi_get = multi_get_fn + # a defaultdict of verified ops ready to be applied self._items = defaultdict(list) + # a faster deque of ops that have not yet been checked for integrity errors + self._stash: Deque[RevertableOp] = deque() + self._stashed_last_op_for_key = {} self._unsafe_prefixes = unsafe_prefixes or set() + def stash_ops(self, ops: Iterable[RevertableOp]): + self._stash.extend(ops) + for op in ops: + self._stashed_last_op_for_key[op.key] = op + + def validate_and_apply_stashed_ops(self): + if not self._stash: + return + + ops_to_apply = [] + append_op_needed = ops_to_apply.append + pop_staged_op = self._stash.popleft + unique_keys = set() + + # nullify the ops that cancel against the most recent staged for a key + while self._stash: + op = pop_staged_op() + if self._items[op.key] and op.invert() == self._items[op.key][-1]: + self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both + continue + elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op + continue # raise an error? + else: + append_op_needed(op) + unique_keys.add(op.key) + for op in ops_to_apply: + if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1] == op.invert(): + self._items[op.key].pop() + if not self._items[op.key]: + self._items.pop(op.key) + continue + if not self._enforce_integrity: + self._items[op.key].append(op) + continue + stored_val = existing[op.key] + has_stored_val = stored_val is not None + delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) + will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) + try: + if op.is_delete: + if has_stored_val and stored_val != op.value and not will_delete_existing_stored: + # there is a value and we're not deleting it in this op + # check that a delete for the stored value is in the stack + raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}") + elif not stored_val: + raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") + elif stored_val != op.value: + raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") + else: + if has_stored_val and not will_delete_existing_stored: + raise OpStackIntegrity(f"db op tries to overwrite before deleting existing: {op}") + if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1].is_put: + raise OpStackIntegrity(f"db op tries to overwrite with {op} before deleting pending " + f"{self._items[op.key][-1]}") + except OpStackIntegrity as err: + if op.key[:1] in self._unsafe_prefixes: + log.debug(f"skipping over integrity error: {err}") + else: + raise err + self._items[op.key].append(op) + + self._stashed_last_op_for_key.clear() + def append_op(self, op: RevertableOp): """ Apply a put or delete op, checking that it introduces no integrity errors @@ -217,15 +284,10 @@ class RevertableOpStack: raise err self._items[op.key].append(op) - def extend_ops(self, ops: Iterable[RevertableOp]): - """ - Apply a sequence of put or delete ops, checking that they introduce no integrity errors - """ - for op in ops: - self.append_op(op) - def clear(self): self._items.clear() + self._stash.clear() + self._stashed_last_op_for_key.clear() def __len__(self): return sum(map(len, self._items.values())) @@ -254,6 +316,8 @@ class RevertableOpStack: op, packed = RevertableOp.unpack(packed) self.append_op(op) - def get_last_op_for_key(self, key: bytes) -> Optional[RevertableOp]: + def get_pending_op(self, key: bytes) -> Optional[RevertableOp]: + if key in self._stashed_last_op_for_key: + return self._stashed_last_op_for_key[key] if key in self._items and self._items[key]: return self._items[key][-1] diff --git a/hub/scribe/db.py b/hub/scribe/db.py index c578518..691a551 100644 --- a/hub/scribe/db.py +++ b/hub/scribe/db.py @@ -46,7 +46,7 @@ class PrimaryDB(SecondaryDB): if start_height <= 0: self.logger.info("loading all blockchain addresses, this will take a little while...") - hashXs = [hashX for hashX in hashX_iterator()] + hashXs = list({hashX for hashX in hashX_iterator()}) else: self.logger.info("loading addresses since block %i...", start_height) hashXs = set() @@ -68,11 +68,11 @@ class PrimaryDB(SecondaryDB): if existing_status and existing_status == status: continue elif not existing_status: - prefix_db.stage_raw_put(key, status) + prefix_db.stash_raw_put(key, status) op_cnt += 1 else: - prefix_db.stage_raw_delete(key, existing_status) - prefix_db.stage_raw_put(key, status) + prefix_db.stash_raw_delete(key, existing_status) + prefix_db.stash_raw_put(key, status) op_cnt += 2 if op_cnt > 100000: prefix_db.unsafe_commit() @@ -93,8 +93,8 @@ class PrimaryDB(SecondaryDB): def apply_expiration_extension_fork(self): # TODO: this can't be reorged for k, v in self.prefix_db.claim_expiration.iterate(): - self.prefix_db.claim_expiration.stage_delete(k, v) - self.prefix_db.claim_expiration.stage_put( + self.prefix_db.claim_expiration.stash_delete(k, v) + self.prefix_db.claim_expiration.stash_put( (bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime, k.tx_num, k.position), v ) @@ -104,8 +104,8 @@ class PrimaryDB(SecondaryDB): """Write (UTXO) state to the batch.""" if self.db_height > 0: existing = self.prefix_db.db_state.get() - self.prefix_db.db_state.stage_delete((), existing.expanded) - self.prefix_db.db_state.stage_put((), ( + self.prefix_db.db_state.stash_delete((), existing.expanded) + self.prefix_db.db_state.stash_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version, self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, diff --git a/hub/scribe/service.py b/hub/scribe/service.py index 7e7ef81..a751043 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -171,9 +171,9 @@ class BlockchainProcessorService(BlockchainService): for hashX in touched_hashXs: self._get_update_hashX_mempool_status_ops(hashX) for tx_hash, raw_tx in to_put: - mempool_prefix.stage_put((tx_hash,), (raw_tx,)) + mempool_prefix.stash_put((tx_hash,), (raw_tx,)) for tx_hash, raw_tx in to_delete.items(): - mempool_prefix.stage_delete((tx_hash,), (raw_tx,)) + mempool_prefix.stash_delete((tx_hash,), (raw_tx,)) unsafe_commit() async with self.lock: @@ -403,69 +403,66 @@ class BlockchainProcessorService(BlockchainService): def get_add_claim_utxo_ops(self, pending: StagedClaimtrieItem): # claim tip by claim hash - self.db.prefix_db.claim_to_txo.stage_put( + self.db.prefix_db.claim_to_txo.stash_put( (pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount, pending.channel_signature_is_valid, pending.name) ) # claim hash by txo - self.db.prefix_db.txo_to_claim.stage_put( + self.db.prefix_db.txo_to_claim.stash_put( (pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name) ) # claim expiration - self.db.prefix_db.claim_expiration.stage_put( + self.db.prefix_db.claim_expiration.stash_put( (pending.expiration_height, pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name) ) # short url resolution - self.db.prefix_db.claim_short_id.stage_multi_put( - [ - ( - (pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1], - pending.root_tx_num, pending.root_position), - (pending.tx_num, pending.position) - ) for prefix_len in range(10) - ] + for prefix_len in range(10): + self.db.prefix_db.claim_short_id.stash_put( + (pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1], + pending.root_tx_num, pending.root_position), + (pending.tx_num, pending.position) ) if pending.signing_hash and pending.channel_signature_is_valid: # channel by stream - self.db.prefix_db.claim_to_channel.stage_put( + self.db.prefix_db.claim_to_channel.stash_put( (pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,) ) # stream by channel - self.db.prefix_db.channel_to_claim.stage_put( + self.db.prefix_db.channel_to_claim.stash_put( (pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position), (pending.claim_hash,) ) if pending.reposted_claim_hash: - self.db.prefix_db.repost.stage_put((pending.claim_hash,), (pending.reposted_claim_hash,)) - self.db.prefix_db.reposted_claim.stage_put( + self.db.prefix_db.repost.stash_put((pending.claim_hash,), (pending.reposted_claim_hash,)) + self.db.prefix_db.reposted_claim.stash_put( (pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,) ) def get_remove_claim_utxo_ops(self, pending: StagedClaimtrieItem): # claim tip by claim hash - self.db.prefix_db.claim_to_txo.stage_delete( + self.db.prefix_db.claim_to_txo.stash_delete( (pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount, pending.channel_signature_is_valid, pending.name) ) # claim hash by txo - self.db.prefix_db.txo_to_claim.stage_delete( + self.db.prefix_db.txo_to_claim.stash_delete( (pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name) ) # claim expiration - self.db.prefix_db.claim_expiration.stage_delete( + self.db.prefix_db.claim_expiration.stash_delete( (pending.expiration_height, pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name) ) # short url resolution - for prefix_len in range(10): # TODO: multi_delete - self.db.prefix_db.claim_short_id.stage_delete( + for prefix_len in range(10): + self.db.prefix_db.claim_short_id.stash_delete( (pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1], pending.root_tx_num, pending.root_position), (pending.tx_num, pending.position) @@ -473,18 +470,18 @@ class BlockchainProcessorService(BlockchainService): if pending.signing_hash and pending.channel_signature_is_valid: # channel by stream - self.db.prefix_db.claim_to_channel.stage_delete( + self.db.prefix_db.claim_to_channel.stash_delete( (pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,) ) # stream by channel - self.db.prefix_db.channel_to_claim.stage_delete( + self.db.prefix_db.channel_to_claim.stash_delete( (pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position), (pending.claim_hash,) ) if pending.reposted_claim_hash: - self.db.prefix_db.repost.stage_delete((pending.claim_hash,), (pending.reposted_claim_hash,)) - self.db.prefix_db.reposted_claim.stage_delete( + self.db.prefix_db.repost.stash_delete((pending.claim_hash,), (pending.reposted_claim_hash,)) + self.db.prefix_db.reposted_claim.stash_delete( (pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,) ) self.reposted_count_delta[pending.reposted_claim_hash] -= 1 @@ -495,8 +492,8 @@ class BlockchainProcessorService(BlockchainService): self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.value # print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.value}") - self.db.prefix_db.claim_to_support.stage_put((supported_claim_hash, tx_num, nout), (txo.value,)) - self.db.prefix_db.support_to_claim.stage_put((tx_num, nout), (supported_claim_hash,)) + self.db.prefix_db.claim_to_support.stash_put((supported_claim_hash, tx_num, nout), (txo.value,)) + self.db.prefix_db.support_to_claim.stash_put((tx_num, nout), (supported_claim_hash,)) self.pending_support_amount_change[supported_claim_hash] += txo.value def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'TxOutput', @@ -604,20 +601,20 @@ class BlockchainProcessorService(BlockchainService): def _get_invalidate_signature_ops(self, pending: StagedClaimtrieItem): if not pending.signing_hash: return - self.db.prefix_db.claim_to_channel.stage_delete( + self.db.prefix_db.claim_to_channel.stash_delete( (pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,) ) if pending.channel_signature_is_valid: - self.db.prefix_db.channel_to_claim.stage_delete( + self.db.prefix_db.channel_to_claim.stash_delete( (pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position), (pending.claim_hash,) ) - self.db.prefix_db.claim_to_txo.stage_delete( + self.db.prefix_db.claim_to_txo.stash_delete( (pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount, pending.channel_signature_is_valid, pending.name) ) - self.db.prefix_db.claim_to_txo.stage_put( + self.db.prefix_db.claim_to_txo.stash_put( (pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount, False, pending.name) @@ -750,25 +747,25 @@ class BlockchainProcessorService(BlockchainService): def get_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int, activation_height: int, name: str, amount: int): - self.db.prefix_db.activated.stage_put( + self.db.prefix_db.activated.stash_put( (txo_type, tx_num, position), (activation_height, claim_hash, name) ) - self.db.prefix_db.pending_activation.stage_put( + self.db.prefix_db.pending_activation.stash_put( (activation_height, txo_type, tx_num, position), (claim_hash, name) ) - self.db.prefix_db.active_amount.stage_put( + self.db.prefix_db.active_amount.stash_put( (claim_hash, txo_type, activation_height, tx_num, position), (amount,) ) def get_remove_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int, activation_height: int, name: str, amount: int): - self.db.prefix_db.activated.stage_delete( + self.db.prefix_db.activated.stash_delete( (txo_type, tx_num, position), (activation_height, claim_hash, name) ) - self.db.prefix_db.pending_activation.stage_delete( + self.db.prefix_db.pending_activation.stash_delete( (activation_height, txo_type, tx_num, position), (claim_hash, name) ) - self.db.prefix_db.active_amount.stage_delete( + self.db.prefix_db.active_amount.stash_delete( (claim_hash, txo_type, activation_height, tx_num, position), (amount,) ) @@ -979,7 +976,7 @@ class BlockchainProcessorService(BlockchainService): if not has_candidate: # remove name takeover entry, the name is now unclaimed controlling = get_controlling(need_takeover) - self.db.prefix_db.claim_takeover.stage_delete( + self.db.prefix_db.claim_takeover.stash_delete( (need_takeover,), (controlling.claim_hash, controlling.height) ) @@ -1104,10 +1101,10 @@ class BlockchainProcessorService(BlockchainService): self.taken_over_names.add(name) if controlling: - self.db.prefix_db.claim_takeover.stage_delete( + self.db.prefix_db.claim_takeover.stash_delete( (name,), (controlling.claim_hash, controlling.height) ) - self.db.prefix_db.claim_takeover.stage_put((name,), (winning_including_future_activations, height)) + self.db.prefix_db.claim_takeover.stash_put((name,), (winning_including_future_activations, height)) self.touched_claim_hashes.add(winning_including_future_activations) if controlling and controlling.claim_hash not in self.abandoned_claims: self.touched_claim_hashes.add(controlling.claim_hash) @@ -1172,10 +1169,10 @@ class BlockchainProcessorService(BlockchainService): self.taken_over_names.add(name) # print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") if controlling: - self.db.prefix_db.claim_takeover.stage_delete( + self.db.prefix_db.claim_takeover.stash_delete( (name,), (controlling.claim_hash, controlling.height) ) - self.db.prefix_db.claim_takeover.stage_put((name,), (winning, height)) + self.db.prefix_db.claim_takeover.stash_put((name,), (winning, height)) if controlling: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning) @@ -1208,7 +1205,7 @@ class BlockchainProcessorService(BlockchainService): if delta != 0: new_count = existing_repost_counts.get(claim_hash, 0) + delta repost_count_puts.append(((claim_hash,), (new_count,))) - self.db.prefix_db.reposted_count.stage_multi_put(repost_count_puts) + self.db.prefix_db.reposted_count.stash_multi_put(repost_count_puts) # gather cumulative removed/touched sets to update the search index self.removed_claim_hashes.update(set(self.abandoned_claims.keys())) @@ -1239,8 +1236,8 @@ class BlockchainProcessorService(BlockchainService): total = amount if existing is not None: total += existing.amount - self.db.prefix_db.support_amount.stage_delete((supported_claim,), existing) - self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,)) + self.db.prefix_db.support_amount.stash_delete((supported_claim,), existing) + self.db.prefix_db.support_amount.stash_put((supported_claim,), (total,)) # use the cumulative changes to update bid ordered resolve # @@ -1252,7 +1249,7 @@ class BlockchainProcessorService(BlockchainService): removed_claim.normalized_name, removed ) if amt: - self.db.prefix_db.bid_order.stage_delete( + self.db.prefix_db.bid_order.stash_delete( (removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,) ) # update or insert new bid orders and prepare to update the effective amount index @@ -1267,7 +1264,7 @@ class BlockchainProcessorService(BlockchainService): claim_amount_info = self.db.get_url_effective_amount(name, touched) if claim_amount_info: prev_effective_amount = claim_amount_info.effective_amount - self.db.prefix_db.bid_order.stage_delete( + self.db.prefix_db.bid_order.stash_delete( (name, claim_amount_info.effective_amount, claim_amount_info.tx_num, claim_amount_info.position), (touched,) ) @@ -1279,19 +1276,19 @@ class BlockchainProcessorService(BlockchainService): amt = self.db.get_url_effective_amount(name, touched) if amt: prev_effective_amount = amt.effective_amount - self.db.prefix_db.bid_order.stage_delete( + self.db.prefix_db.bid_order.stash_delete( (name, prev_effective_amount, amt.tx_num, amt.position), (touched,) ) - new_effective_amount = self._get_pending_effective_amount(name, touched) - self.db.prefix_db.bid_order.stage_put( + new_effective_amount = touched_effective_amounts.get(touched, 0) + self.db.prefix_db.bid_order.stash_put( (name, new_effective_amount, tx_num, position), (touched,) ) if touched in self.claim_hash_to_txo or touched in self.removed_claim_hashes \ or touched in self.pending_support_amount_change: # exclude sending notifications for claims/supports that activated but # weren't added/spent in this block - self.db.prefix_db.trending_notification.stage_put( + self.db.prefix_db.trending_notification.stash_put( (height, touched), (prev_effective_amount, new_effective_amount) ) @@ -1333,7 +1330,7 @@ class BlockchainProcessorService(BlockchainService): if delete_effective_amounts: self.db.prefix_db.multi_delete(delete_effective_amounts) if new_effective_amounts: - self.db.prefix_db.effective_amount.stage_multi_put( + self.db.prefix_db.effective_amount.stash_multi_put( [((claim_hash,), (amount, support_sum)) for claim_hash, (amount, support_sum) in new_effective_amounts.items()] ) @@ -1343,8 +1340,8 @@ class BlockchainProcessorService(BlockchainService): channel_count_val = self.db.prefix_db.channel_count.get(channel_hash) channel_count = 0 if not channel_count_val else channel_count_val.count if channel_count_val is not None: - self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,)) - self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,)) + self.db.prefix_db.channel_count.stash_delete((channel_hash,), (channel_count,)) + self.db.prefix_db.channel_count.stash_put((channel_hash,), (channel_count + count,)) # update the sets of touched and removed claims for es sync self.touched_claim_hashes.update( @@ -1386,11 +1383,11 @@ class BlockchainProcessorService(BlockchainService): def _get_update_hashX_mempool_status_ops(self, hashX: bytes): existing = self.db.prefix_db.hashX_mempool_status.get(hashX) if existing: - self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing) + self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing) history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX) if history: status = sha256(history.encode()) - self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) + self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,)) def advance_block(self, block: Block): height = self.height + 1 @@ -1404,9 +1401,9 @@ class BlockchainProcessorService(BlockchainService): txs: List[Tuple[Tx, bytes]] = block.transactions txo_count = 0 - self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),)) - self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,)) - self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],)) + self.db.prefix_db.block_hash.stash_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),)) + self.db.prefix_db.header.stash_put(key_args=(height,), value_args=(block.header,)) + self.db.prefix_db.block_txs.stash_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],)) for tx, tx_hash in txs: spent_claims = {} @@ -1414,11 +1411,12 @@ class BlockchainProcessorService(BlockchainService): # leave txs in mempool that werent in the block mempool_tx = self.db.prefix_db.mempool_tx.get_pending(tx_hash) if mempool_tx: - self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), mempool_tx) + self.db.prefix_db.mempool_tx.stash_delete((tx_hash,), mempool_tx) + + self.db.prefix_db.tx.stash_put(key_args=(tx_hash,), value_args=(tx.raw,)) + self.db.prefix_db.tx_num.stash_put(key_args=(tx_hash,), value_args=(tx_count,)) + self.db.prefix_db.tx_hash.stash_put(key_args=(tx_count,), value_args=(tx_hash,)) - self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,)) - self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,)) - self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,)) # Spend the inputs for txin in tx.inputs: @@ -1476,9 +1474,9 @@ class BlockchainProcessorService(BlockchainService): # update effective amount and update sets of touched and deleted claims self._get_cumulative_update_ops(height) - self.db.prefix_db.touched_hashX.stage_put((height,), (list(sorted(self.touched_hashXs)),)) + self.db.prefix_db.touched_hashX.stash_put((height,), (list(sorted(self.touched_hashXs)),)) - self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) + self.db.prefix_db.tx_count.stash_put(key_args=(height,), value_args=(tx_count,)) # clear the mempool tx index self._get_clear_mempool_ops() @@ -1498,7 +1496,7 @@ class BlockchainProcessorService(BlockchainService): cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit # if height >= cached_max_reorg_depth: - self.db.prefix_db.touched_or_deleted.stage_put( + self.db.prefix_db.touched_or_deleted.stash_put( key_args=(height,), value_args=(self.touched_claim_hashes, self.removed_claim_hashes) ) @@ -1538,7 +1536,7 @@ class BlockchainProcessorService(BlockchainService): ) def _get_update_hashX_histories_ops(self, height: int): - self.db.prefix_db.hashX_history.stage_multi_put( + self.db.prefix_db.hashX_history.stash_multi_put( [((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()] ) @@ -1584,7 +1582,6 @@ class BlockchainProcessorService(BlockchainService): history_item_0 = (k, hist) elif hist_height > 0: needs_compaction = True - # self.db.prefix_db.stage_raw_delete(k, hist) append_deletes_hashX_history((k, hist)) existing_item_0 = history_item_0 if needs_compaction: @@ -1615,10 +1612,10 @@ class BlockchainProcessorService(BlockchainService): history += f'{tx_hash[::-1].hex()}:{height:d}:' if history: status = sha256(history.encode()) - self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) + self.db.prefix_db.hashX_status.stash_put((hashX,), (status,)) self.db.prefix_db.multi_delete(block_hashX_history_deletes) - self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts) + self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts) def clear_after_advance_or_reorg(self): self.txo_to_claim.clear() @@ -1733,8 +1730,8 @@ class BlockchainProcessorService(BlockchainService): if hashX: self.touched_hashXs.add(hashX) self.utxo_cache[(tx_hash, nout)] = (hashX, txout.value) - self.db.prefix_db.utxo.stage_put((hashX, tx_num, nout), (txout.value,)) - self.db.prefix_db.hashX_utxo.stage_put((tx_hash[:4], tx_num, nout), (hashX,)) + self.db.prefix_db.utxo.stash_put((hashX, tx_num, nout), (txout.value,)) + self.db.prefix_db.hashX_utxo.stash_put((tx_hash[:4], tx_num, nout), (hashX,)) return hashX def get_pending_tx_num(self, tx_hash: bytes) -> int: