batched db integrity checks

This commit is contained in:
Jack Robison 2022-10-16 14:14:29 -04:00
parent 606e9bb0d6
commit a48564e3b2
4 changed files with 187 additions and 114 deletions

View file

@ -89,6 +89,12 @@ class PrefixRow(metaclass=PrefixRowType):
if v:
return v if not deserialize_value else self.unpack_value(v)
def key_exists(self, *key_args):
key_may_exist, _ = self._db.key_may_exist((self._column_family, self.pack_key(*key_args)))
if not key_may_exist:
return False
return self._db.get((self._column_family, self.pack_key(*key_args)), fill_cache=True) is not None
def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True):
packed_keys = {tuple(args): self.pack_key(*args) for args in key_args}
db_result = self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args],
@ -118,26 +124,28 @@ class PrefixRow(metaclass=PrefixRowType):
if idx % step == 0:
await asyncio.sleep(0)
def stage_multi_put(self, items):
self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def stash_multi_put(self, items):
self._op_stack.stash_ops([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def stash_multi_delete(self, items):
self._op_stack.stash_ops([RevertableDelete(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
packed_key = self.pack_key(*key_args)
last_op = self._op_stack.get_last_op_for_key(packed_key)
if last_op:
if last_op.is_put:
return last_op.value if not deserialize_value else self.unpack_value(last_op.value)
else: # it's a delete
return
v = self._db.get((self._column_family, packed_key), fill_cache=fill_cache)
if v:
return v if not deserialize_value else self.unpack_value(v)
pending_op = self._op_stack.get_pending_op(packed_key)
if pending_op and pending_op.is_delete:
return
if pending_op:
v = pending_op.value
else:
v = self._db.get((self._column_family, packed_key), fill_cache=fill_cache)
return None if v is None else (v if not deserialize_value else self.unpack_value(v))
def stage_put(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args)))
def stash_put(self, key_args=(), value_args=()):
self._op_stack.stash_ops([RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))])
def stage_delete(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args)))
def stash_delete(self, key_args=(), value_args=()):
self._op_stack.stash_ops([RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args))])
@classmethod
def pack_partial_key(cls, *args) -> bytes:
@ -181,7 +189,7 @@ class BasePrefixDB:
settings = COLUMN_SETTINGS[prefix.value]
column_family_options[prefix.value] = rocksdb.ColumnFamilyOptions()
column_family_options[prefix.value].table_factory = rocksdb.BlockBasedTableFactory(
block_cache=rocksdb.LRUCache(settings['cache_size']),
block_cache=rocksdb.LRUCache(settings['cache_size'])
)
self.column_families: typing.Dict[bytes, 'rocksdb.ColumnFamilyHandle'] = {}
options = rocksdb.Options(
@ -206,6 +214,7 @@ class BasePrefixDB:
Write staged changes to the database without keeping undo information
Changes written cannot be undone
"""
self.apply_stash()
try:
if not len(self._op_stack):
return
@ -226,6 +235,7 @@ class BasePrefixDB:
"""
Write changes for a block height to the database and keep undo information so that the changes can be reverted
"""
self.apply_stash()
undo_ops = self._op_stack.get_undo_ops()
delete_undos = []
if height > self._max_undo_depth:
@ -275,6 +285,9 @@ class BasePrefixDB:
finally:
self._op_stack.clear()
def apply_stash(self):
self._op_stack.validate_and_apply_stashed_ops()
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
cf = self.column_families[key[:1]]
return self._db.get((cf, key), fill_cache=fill_cache)
@ -282,16 +295,15 @@ class BasePrefixDB:
def multi_get(self, keys: typing.List[bytes], fill_cache=True):
if len(keys) == 0:
return []
first_key = keys[0]
get_cf = self.column_families.__getitem__
db_result = self._db.multi_get([(get_cf(k[:1]), k) for k in keys], fill_cache=fill_cache)
return list(db_result.values())
def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]):
self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items])
self._op_stack.stash_ops([RevertableDelete(k, v) for k, v in items])
def multi_put(self, items: typing.List[typing.Tuple[bytes, bytes]]):
self._op_stack.multi_put([RevertablePut(k, v) for k, v in items])
self._op_stack.stash_ops([RevertablePut(k, v) for k, v in items])
def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None,
iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None,
@ -310,11 +322,11 @@ class BasePrefixDB:
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
def stage_raw_put(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertablePut(key, value))
def stash_raw_put(self, key: bytes, value: bytes):
self._op_stack.stash_ops([RevertablePut(key, value)])
def stage_raw_delete(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertableDelete(key, value))
def stash_raw_delete(self, key: bytes, value: bytes):
self._op_stack.stash_ops([RevertableDelete(key, value)])
def estimate_num_keys(self, column_family: 'rocksdb.ColumnFamilyHandle' = None):
return int(self._db.get_property(b'rocksdb.estimate-num-keys', column_family).decode())

View file

@ -1,8 +1,8 @@
import struct
import logging
from string import printable
from collections import defaultdict
from typing import Tuple, Iterable, Callable, Optional, List
from collections import defaultdict, deque
from typing import Tuple, Iterable, Callable, Optional, List, Deque
from hub.db.common import DB_PREFIXES
_OP_STRUCT = struct.Struct('>BLL')
@ -97,9 +97,76 @@ class RevertableOpStack:
"""
self._get = get_fn
self._multi_get = multi_get_fn
# a defaultdict of verified ops ready to be applied
self._items = defaultdict(list)
# a faster deque of ops that have not yet been checked for integrity errors
self._stash: Deque[RevertableOp] = deque()
self._stashed_last_op_for_key = {}
self._unsafe_prefixes = unsafe_prefixes or set()
def stash_ops(self, ops: Iterable[RevertableOp]):
self._stash.extend(ops)
for op in ops:
self._stashed_last_op_for_key[op.key] = op
def validate_and_apply_stashed_ops(self):
if not self._stash:
return
ops_to_apply = []
append_op_needed = ops_to_apply.append
pop_staged_op = self._stash.popleft
unique_keys = set()
# nullify the ops that cancel against the most recent staged for a key
while self._stash:
op = pop_staged_op()
if self._items[op.key] and op.invert() == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
continue
elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op
continue # raise an error?
else:
append_op_needed(op)
unique_keys.add(op.key)
for op in ops_to_apply:
if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1] == op.invert():
self._items[op.key].pop()
if not self._items[op.key]:
self._items.pop(op.key)
continue
if not self._enforce_integrity:
self._items[op.key].append(op)
continue
stored_val = existing[op.key]
has_stored_val = stored_val is not None
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
try:
if op.is_delete:
if has_stored_val and stored_val != op.value and not will_delete_existing_stored:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
elif not stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
else:
if has_stored_val and not will_delete_existing_stored:
raise OpStackIntegrity(f"db op tries to overwrite before deleting existing: {op}")
if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1].is_put:
raise OpStackIntegrity(f"db op tries to overwrite with {op} before deleting pending "
f"{self._items[op.key][-1]}")
except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes:
log.debug(f"skipping over integrity error: {err}")
else:
raise err
self._items[op.key].append(op)
self._stashed_last_op_for_key.clear()
def append_op(self, op: RevertableOp):
"""
Apply a put or delete op, checking that it introduces no integrity errors
@ -217,15 +284,10 @@ class RevertableOpStack:
raise err
self._items[op.key].append(op)
def extend_ops(self, ops: Iterable[RevertableOp]):
"""
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
"""
for op in ops:
self.append_op(op)
def clear(self):
self._items.clear()
self._stash.clear()
self._stashed_last_op_for_key.clear()
def __len__(self):
return sum(map(len, self._items.values()))
@ -254,6 +316,8 @@ class RevertableOpStack:
op, packed = RevertableOp.unpack(packed)
self.append_op(op)
def get_last_op_for_key(self, key: bytes) -> Optional[RevertableOp]:
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
if key in self._stashed_last_op_for_key:
return self._stashed_last_op_for_key[key]
if key in self._items and self._items[key]:
return self._items[key][-1]

View file

@ -46,7 +46,7 @@ class PrimaryDB(SecondaryDB):
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
hashXs = list({hashX for hashX in hashX_iterator()})
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
@ -68,11 +68,11 @@ class PrimaryDB(SecondaryDB):
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
prefix_db.stash_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
prefix_db.stash_raw_delete(key, existing_status)
prefix_db.stash_raw_put(key, status)
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
@ -93,8 +93,8 @@ class PrimaryDB(SecondaryDB):
def apply_expiration_extension_fork(self):
# TODO: this can't be reorged
for k, v in self.prefix_db.claim_expiration.iterate():
self.prefix_db.claim_expiration.stage_delete(k, v)
self.prefix_db.claim_expiration.stage_put(
self.prefix_db.claim_expiration.stash_delete(k, v)
self.prefix_db.claim_expiration.stash_put(
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
k.tx_num, k.position), v
)
@ -104,8 +104,8 @@ class PrimaryDB(SecondaryDB):
"""Write (UTXO) state to the batch."""
if self.db_height > 0:
existing = self.prefix_db.db_state.get()
self.prefix_db.db_state.stage_delete((), existing.expanded)
self.prefix_db.db_state.stage_put((), (
self.prefix_db.db_state.stash_delete((), existing.expanded)
self.prefix_db.db_state.stash_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,

View file

@ -171,9 +171,9 @@ class BlockchainProcessorService(BlockchainService):
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
for tx_hash, raw_tx in to_put:
mempool_prefix.stage_put((tx_hash,), (raw_tx,))
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items():
mempool_prefix.stage_delete((tx_hash,), (raw_tx,))
mempool_prefix.stash_delete((tx_hash,), (raw_tx,))
unsafe_commit()
async with self.lock:
@ -403,69 +403,66 @@ class BlockchainProcessorService(BlockchainService):
def get_add_claim_utxo_ops(self, pending: StagedClaimtrieItem):
# claim tip by claim hash
self.db.prefix_db.claim_to_txo.stage_put(
self.db.prefix_db.claim_to_txo.stash_put(
(pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position,
pending.amount, pending.channel_signature_is_valid, pending.name)
)
# claim hash by txo
self.db.prefix_db.txo_to_claim.stage_put(
self.db.prefix_db.txo_to_claim.stash_put(
(pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name)
)
# claim expiration
self.db.prefix_db.claim_expiration.stage_put(
self.db.prefix_db.claim_expiration.stash_put(
(pending.expiration_height, pending.tx_num, pending.position),
(pending.claim_hash, pending.normalized_name)
)
# short url resolution
self.db.prefix_db.claim_short_id.stage_multi_put(
[
(
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
pending.root_tx_num, pending.root_position),
(pending.tx_num, pending.position)
) for prefix_len in range(10)
]
for prefix_len in range(10):
self.db.prefix_db.claim_short_id.stash_put(
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
pending.root_tx_num, pending.root_position),
(pending.tx_num, pending.position)
)
if pending.signing_hash and pending.channel_signature_is_valid:
# channel by stream
self.db.prefix_db.claim_to_channel.stage_put(
self.db.prefix_db.claim_to_channel.stash_put(
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
)
# stream by channel
self.db.prefix_db.channel_to_claim.stage_put(
self.db.prefix_db.channel_to_claim.stash_put(
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
(pending.claim_hash,)
)
if pending.reposted_claim_hash:
self.db.prefix_db.repost.stage_put((pending.claim_hash,), (pending.reposted_claim_hash,))
self.db.prefix_db.reposted_claim.stage_put(
self.db.prefix_db.repost.stash_put((pending.claim_hash,), (pending.reposted_claim_hash,))
self.db.prefix_db.reposted_claim.stash_put(
(pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,)
)
def get_remove_claim_utxo_ops(self, pending: StagedClaimtrieItem):
# claim tip by claim hash
self.db.prefix_db.claim_to_txo.stage_delete(
self.db.prefix_db.claim_to_txo.stash_delete(
(pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position,
pending.amount, pending.channel_signature_is_valid, pending.name)
)
# claim hash by txo
self.db.prefix_db.txo_to_claim.stage_delete(
self.db.prefix_db.txo_to_claim.stash_delete(
(pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name)
)
# claim expiration
self.db.prefix_db.claim_expiration.stage_delete(
self.db.prefix_db.claim_expiration.stash_delete(
(pending.expiration_height, pending.tx_num, pending.position),
(pending.claim_hash, pending.normalized_name)
)
# short url resolution
for prefix_len in range(10): # TODO: multi_delete
self.db.prefix_db.claim_short_id.stage_delete(
for prefix_len in range(10):
self.db.prefix_db.claim_short_id.stash_delete(
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
pending.root_tx_num, pending.root_position),
(pending.tx_num, pending.position)
@ -473,18 +470,18 @@ class BlockchainProcessorService(BlockchainService):
if pending.signing_hash and pending.channel_signature_is_valid:
# channel by stream
self.db.prefix_db.claim_to_channel.stage_delete(
self.db.prefix_db.claim_to_channel.stash_delete(
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
)
# stream by channel
self.db.prefix_db.channel_to_claim.stage_delete(
self.db.prefix_db.channel_to_claim.stash_delete(
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
(pending.claim_hash,)
)
if pending.reposted_claim_hash:
self.db.prefix_db.repost.stage_delete((pending.claim_hash,), (pending.reposted_claim_hash,))
self.db.prefix_db.reposted_claim.stage_delete(
self.db.prefix_db.repost.stash_delete((pending.claim_hash,), (pending.reposted_claim_hash,))
self.db.prefix_db.reposted_claim.stash_delete(
(pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,)
)
self.reposted_count_delta[pending.reposted_claim_hash] -= 1
@ -495,8 +492,8 @@ class BlockchainProcessorService(BlockchainService):
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.value
# print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.value}")
self.db.prefix_db.claim_to_support.stage_put((supported_claim_hash, tx_num, nout), (txo.value,))
self.db.prefix_db.support_to_claim.stage_put((tx_num, nout), (supported_claim_hash,))
self.db.prefix_db.claim_to_support.stash_put((supported_claim_hash, tx_num, nout), (txo.value,))
self.db.prefix_db.support_to_claim.stash_put((tx_num, nout), (supported_claim_hash,))
self.pending_support_amount_change[supported_claim_hash] += txo.value
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'TxOutput',
@ -604,20 +601,20 @@ class BlockchainProcessorService(BlockchainService):
def _get_invalidate_signature_ops(self, pending: StagedClaimtrieItem):
if not pending.signing_hash:
return
self.db.prefix_db.claim_to_channel.stage_delete(
self.db.prefix_db.claim_to_channel.stash_delete(
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
)
if pending.channel_signature_is_valid:
self.db.prefix_db.channel_to_claim.stage_delete(
self.db.prefix_db.channel_to_claim.stash_delete(
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
(pending.claim_hash,)
)
self.db.prefix_db.claim_to_txo.stage_delete(
self.db.prefix_db.claim_to_txo.stash_delete(
(pending.claim_hash,),
(pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount,
pending.channel_signature_is_valid, pending.name)
)
self.db.prefix_db.claim_to_txo.stage_put(
self.db.prefix_db.claim_to_txo.stash_put(
(pending.claim_hash,),
(pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount,
False, pending.name)
@ -750,25 +747,25 @@ class BlockchainProcessorService(BlockchainService):
def get_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int,
activation_height: int, name: str, amount: int):
self.db.prefix_db.activated.stage_put(
self.db.prefix_db.activated.stash_put(
(txo_type, tx_num, position), (activation_height, claim_hash, name)
)
self.db.prefix_db.pending_activation.stage_put(
self.db.prefix_db.pending_activation.stash_put(
(activation_height, txo_type, tx_num, position), (claim_hash, name)
)
self.db.prefix_db.active_amount.stage_put(
self.db.prefix_db.active_amount.stash_put(
(claim_hash, txo_type, activation_height, tx_num, position), (amount,)
)
def get_remove_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int,
activation_height: int, name: str, amount: int):
self.db.prefix_db.activated.stage_delete(
self.db.prefix_db.activated.stash_delete(
(txo_type, tx_num, position), (activation_height, claim_hash, name)
)
self.db.prefix_db.pending_activation.stage_delete(
self.db.prefix_db.pending_activation.stash_delete(
(activation_height, txo_type, tx_num, position), (claim_hash, name)
)
self.db.prefix_db.active_amount.stage_delete(
self.db.prefix_db.active_amount.stash_delete(
(claim_hash, txo_type, activation_height, tx_num, position), (amount,)
)
@ -979,7 +976,7 @@ class BlockchainProcessorService(BlockchainService):
if not has_candidate:
# remove name takeover entry, the name is now unclaimed
controlling = get_controlling(need_takeover)
self.db.prefix_db.claim_takeover.stage_delete(
self.db.prefix_db.claim_takeover.stash_delete(
(need_takeover,), (controlling.claim_hash, controlling.height)
)
@ -1104,10 +1101,10 @@ class BlockchainProcessorService(BlockchainService):
self.taken_over_names.add(name)
if controlling:
self.db.prefix_db.claim_takeover.stage_delete(
self.db.prefix_db.claim_takeover.stash_delete(
(name,), (controlling.claim_hash, controlling.height)
)
self.db.prefix_db.claim_takeover.stage_put((name,), (winning_including_future_activations, height))
self.db.prefix_db.claim_takeover.stash_put((name,), (winning_including_future_activations, height))
self.touched_claim_hashes.add(winning_including_future_activations)
if controlling and controlling.claim_hash not in self.abandoned_claims:
self.touched_claim_hashes.add(controlling.claim_hash)
@ -1172,10 +1169,10 @@ class BlockchainProcessorService(BlockchainService):
self.taken_over_names.add(name)
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
if controlling:
self.db.prefix_db.claim_takeover.stage_delete(
self.db.prefix_db.claim_takeover.stash_delete(
(name,), (controlling.claim_hash, controlling.height)
)
self.db.prefix_db.claim_takeover.stage_put((name,), (winning, height))
self.db.prefix_db.claim_takeover.stash_put((name,), (winning, height))
if controlling:
self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning)
@ -1208,7 +1205,7 @@ class BlockchainProcessorService(BlockchainService):
if delta != 0:
new_count = existing_repost_counts.get(claim_hash, 0) + delta
repost_count_puts.append(((claim_hash,), (new_count,)))
self.db.prefix_db.reposted_count.stage_multi_put(repost_count_puts)
self.db.prefix_db.reposted_count.stash_multi_put(repost_count_puts)
# gather cumulative removed/touched sets to update the search index
self.removed_claim_hashes.update(set(self.abandoned_claims.keys()))
@ -1239,8 +1236,8 @@ class BlockchainProcessorService(BlockchainService):
total = amount
if existing is not None:
total += existing.amount
self.db.prefix_db.support_amount.stage_delete((supported_claim,), existing)
self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,))
self.db.prefix_db.support_amount.stash_delete((supported_claim,), existing)
self.db.prefix_db.support_amount.stash_put((supported_claim,), (total,))
# use the cumulative changes to update bid ordered resolve
#
@ -1252,7 +1249,7 @@ class BlockchainProcessorService(BlockchainService):
removed_claim.normalized_name, removed
)
if amt:
self.db.prefix_db.bid_order.stage_delete(
self.db.prefix_db.bid_order.stash_delete(
(removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,)
)
# update or insert new bid orders and prepare to update the effective amount index
@ -1267,7 +1264,7 @@ class BlockchainProcessorService(BlockchainService):
claim_amount_info = self.db.get_url_effective_amount(name, touched)
if claim_amount_info:
prev_effective_amount = claim_amount_info.effective_amount
self.db.prefix_db.bid_order.stage_delete(
self.db.prefix_db.bid_order.stash_delete(
(name, claim_amount_info.effective_amount, claim_amount_info.tx_num,
claim_amount_info.position), (touched,)
)
@ -1279,19 +1276,19 @@ class BlockchainProcessorService(BlockchainService):
amt = self.db.get_url_effective_amount(name, touched)
if amt:
prev_effective_amount = amt.effective_amount
self.db.prefix_db.bid_order.stage_delete(
self.db.prefix_db.bid_order.stash_delete(
(name, prev_effective_amount, amt.tx_num, amt.position), (touched,)
)
new_effective_amount = self._get_pending_effective_amount(name, touched)
self.db.prefix_db.bid_order.stage_put(
new_effective_amount = touched_effective_amounts.get(touched, 0)
self.db.prefix_db.bid_order.stash_put(
(name, new_effective_amount, tx_num, position), (touched,)
)
if touched in self.claim_hash_to_txo or touched in self.removed_claim_hashes \
or touched in self.pending_support_amount_change:
# exclude sending notifications for claims/supports that activated but
# weren't added/spent in this block
self.db.prefix_db.trending_notification.stage_put(
self.db.prefix_db.trending_notification.stash_put(
(height, touched), (prev_effective_amount, new_effective_amount)
)
@ -1333,7 +1330,7 @@ class BlockchainProcessorService(BlockchainService):
if delete_effective_amounts:
self.db.prefix_db.multi_delete(delete_effective_amounts)
if new_effective_amounts:
self.db.prefix_db.effective_amount.stage_multi_put(
self.db.prefix_db.effective_amount.stash_multi_put(
[((claim_hash,), (amount, support_sum)) for claim_hash, (amount, support_sum) in new_effective_amounts.items()]
)
@ -1343,8 +1340,8 @@ class BlockchainProcessorService(BlockchainService):
channel_count_val = self.db.prefix_db.channel_count.get(channel_hash)
channel_count = 0 if not channel_count_val else channel_count_val.count
if channel_count_val is not None:
self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,))
self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,))
self.db.prefix_db.channel_count.stash_delete((channel_hash,), (channel_count,))
self.db.prefix_db.channel_count.stash_put((channel_hash,), (channel_count + count,))
# update the sets of touched and removed claims for es sync
self.touched_claim_hashes.update(
@ -1386,11 +1383,11 @@ class BlockchainProcessorService(BlockchainService):
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing)
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block):
height = self.height + 1
@ -1404,9 +1401,9 @@ class BlockchainProcessorService(BlockchainService):
txs: List[Tuple[Tx, bytes]] = block.transactions
txo_count = 0
self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
self.db.prefix_db.block_hash.stash_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
self.db.prefix_db.header.stash_put(key_args=(height,), value_args=(block.header,))
self.db.prefix_db.block_txs.stash_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
for tx, tx_hash in txs:
spent_claims = {}
@ -1414,11 +1411,12 @@ class BlockchainProcessorService(BlockchainService):
# leave txs in mempool that werent in the block
mempool_tx = self.db.prefix_db.mempool_tx.get_pending(tx_hash)
if mempool_tx:
self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), mempool_tx)
self.db.prefix_db.mempool_tx.stash_delete((tx_hash,), mempool_tx)
self.db.prefix_db.tx.stash_put(key_args=(tx_hash,), value_args=(tx.raw,))
self.db.prefix_db.tx_num.stash_put(key_args=(tx_hash,), value_args=(tx_count,))
self.db.prefix_db.tx_hash.stash_put(key_args=(tx_count,), value_args=(tx_hash,))
self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,))
self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,))
self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,))
# Spend the inputs
for txin in tx.inputs:
@ -1476,9 +1474,9 @@ class BlockchainProcessorService(BlockchainService):
# update effective amount and update sets of touched and deleted claims
self._get_cumulative_update_ops(height)
self.db.prefix_db.touched_hashX.stage_put((height,), (list(sorted(self.touched_hashXs)),))
self.db.prefix_db.touched_hashX.stash_put((height,), (list(sorted(self.touched_hashXs)),))
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
self.db.prefix_db.tx_count.stash_put(key_args=(height,), value_args=(tx_count,))
# clear the mempool tx index
self._get_clear_mempool_ops()
@ -1498,7 +1496,7 @@ class BlockchainProcessorService(BlockchainService):
cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit
# if height >= cached_max_reorg_depth:
self.db.prefix_db.touched_or_deleted.stage_put(
self.db.prefix_db.touched_or_deleted.stash_put(
key_args=(height,), value_args=(self.touched_claim_hashes, self.removed_claim_hashes)
)
@ -1538,7 +1536,7 @@ class BlockchainProcessorService(BlockchainService):
)
def _get_update_hashX_histories_ops(self, height: int):
self.db.prefix_db.hashX_history.stage_multi_put(
self.db.prefix_db.hashX_history.stash_multi_put(
[((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()]
)
@ -1584,7 +1582,6 @@ class BlockchainProcessorService(BlockchainService):
history_item_0 = (k, hist)
elif hist_height > 0:
needs_compaction = True
# self.db.prefix_db.stage_raw_delete(k, hist)
append_deletes_hashX_history((k, hist))
existing_item_0 = history_item_0
if needs_compaction:
@ -1615,10 +1612,10 @@ class BlockchainProcessorService(BlockchainService):
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts)
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
def clear_after_advance_or_reorg(self):
self.txo_to_claim.clear()
@ -1733,8 +1730,8 @@ class BlockchainProcessorService(BlockchainService):
if hashX:
self.touched_hashXs.add(hashX)
self.utxo_cache[(tx_hash, nout)] = (hashX, txout.value)
self.db.prefix_db.utxo.stage_put((hashX, tx_num, nout), (txout.value,))
self.db.prefix_db.hashX_utxo.stage_put((tx_hash[:4], tx_num, nout), (hashX,))
self.db.prefix_db.utxo.stash_put((hashX, tx_num, nout), (txout.value,))
self.db.prefix_db.hashX_utxo.stash_put((tx_hash[:4], tx_num, nout), (hashX,))
return hashX
def get_pending_tx_num(self, tx_hash: bytes) -> int: