add --db_disable_integrity_checks option to scribe

This commit is contained in:
Jack Robison 2022-10-16 14:15:42 -04:00
parent a48564e3b2
commit 55eb8818ea
7 changed files with 37 additions and 12 deletions

View file

@ -40,7 +40,8 @@ class SecondaryDB:
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768): index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
enforce_integrity=True):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.coin = coin self.coin = coin
self._executor = executor self._executor = executor
@ -53,6 +54,7 @@ class SecondaryDB:
assert max_open_files == -1, 'max open files must be -1 for secondary readers' assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files self._db_max_open_files = max_open_files
self._index_address_status = index_address_status self._index_address_status = index_address_status
self._enforce_integrity = enforce_integrity
self.prefix_db: typing.Optional[PrefixDB] = None self.prefix_db: typing.Optional[PrefixDB] = None
self.hist_unflushed = defaultdict(partial(array.array, 'I')) self.hist_unflushed = defaultdict(partial(array.array, 'I'))
@ -1016,7 +1018,7 @@ class SecondaryDB:
self.prefix_db = PrefixDB( self.prefix_db = PrefixDB(
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix}, unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix},
secondary_path=secondary_path secondary_path=secondary_path, enforce_integrity=self._enforce_integrity
) )
if secondary_path != '': if secondary_path != '':

View file

@ -183,7 +183,8 @@ class BasePrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q32s') UNDO_KEY_STRUCT = struct.Struct(b'>Q32s')
PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q') PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None): def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None,
enforce_integrity=True):
column_family_options = {} column_family_options = {}
for prefix in DB_PREFIXES: for prefix in DB_PREFIXES:
settings = COLUMN_SETTINGS[prefix.value] settings = COLUMN_SETTINGS[prefix.value]
@ -206,7 +207,9 @@ class BasePrefixDB:
cf = self._db.get_column_family(prefix.value) cf = self._db.get_column_family(prefix.value)
self.column_families[prefix.value] = cf self.column_families[prefix.value] = cf
self._op_stack = RevertableOpStack(self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes) self._op_stack = RevertableOpStack(
self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes, enforce_integrity=enforce_integrity
)
self._max_undo_depth = max_undo_depth self._max_undo_depth = max_undo_depth
def unsafe_commit(self): def unsafe_commit(self):

View file

@ -1852,9 +1852,11 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
enforce_integrity: bool = True):
super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path, super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path,
max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes) max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes,
enforce_integrity=enforce_integrity)
db = self._db db = self._db
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)

View file

@ -83,7 +83,8 @@ class OpStackIntegrity(Exception):
class RevertableOpStack: class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], def __init__(self, get_fn: Callable[[bytes], Optional[bytes]],
multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None): multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None,
enforce_integrity=True):
""" """
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
@ -103,6 +104,7 @@ class RevertableOpStack:
self._stash: Deque[RevertableOp] = deque() self._stash: Deque[RevertableOp] = deque()
self._stashed_last_op_for_key = {} self._stashed_last_op_for_key = {}
self._unsafe_prefixes = unsafe_prefixes or set() self._unsafe_prefixes = unsafe_prefixes or set()
self._enforce_integrity = enforce_integrity
def stash_ops(self, ops: Iterable[RevertableOp]): def stash_ops(self, ops: Iterable[RevertableOp]):
self._stash.extend(ops) self._stash.extend(ops)
@ -129,6 +131,14 @@ class RevertableOpStack:
else: else:
append_op_needed(op) append_op_needed(op)
unique_keys.add(op.key) unique_keys.add(op.key)
existing = {}
if self._enforce_integrity and unique_keys:
unique_keys = list(unique_keys)
existing.update({
k: v for k, v in zip(unique_keys, self._multi_get(unique_keys))
})
for op in ops_to_apply: for op in ops_to_apply:
if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1] == op.invert(): if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1] == op.invert():
self._items[op.key].pop() self._items[op.key].pop()

View file

@ -13,9 +13,10 @@ class PrimaryDB(SecondaryDB):
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None, max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False): index_address_status=False, enforce_integrity=True):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes, super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status) blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
enforce_integrity=enforce_integrity)
def _rebuild_hashX_status_index(self, start_height: int): def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...") self.logger.warning("rebuilding the address status index...")

View file

@ -7,7 +7,8 @@ class BlockchainEnv(Env):
blocking_channel_ids=None, filtering_channel_ids=None, blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None, index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None, history_tx_cache_size=None): daemon_ca_path=None, history_tx_cache_size=None,
db_disable_integrity_checks=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files self.db_max_open_files = db_max_open_files
@ -19,6 +20,7 @@ class BlockchainEnv(Env):
self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None
self.history_tx_cache_size = history_tx_cache_size if history_tx_cache_size is not None else \ self.history_tx_cache_size = history_tx_cache_size if history_tx_cache_size is not None else \
self.integer('HISTORY_TX_CACHE_SIZE', 4194304) self.integer('HISTORY_TX_CACHE_SIZE', 4194304)
self.db_disable_integrity_checks = db_disable_integrity_checks
@classmethod @classmethod
def contribute_to_arg_parser(cls, parser): def contribute_to_arg_parser(cls, parser):
@ -30,6 +32,9 @@ class BlockchainEnv(Env):
default=env_daemon_url) default=env_daemon_url)
parser.add_argument('--daemon_ca_path', type=str, default='', parser.add_argument('--daemon_ca_path', type=str, default='',
help='Path to the lbcd ca file, used for lbcd with ssl') help='Path to the lbcd ca file, used for lbcd with ssl')
parser.add_argument('--db_disable_integrity_checks', action='store_true',
help="Disable verifications that no db operation breaks the ability to be rewound",
default=False)
parser.add_argument('--db_max_open_files', type=int, default=64, parser.add_argument('--db_max_open_files', type=int, default=64,
help='This setting translates into the max_open_files option given to rocksdb. ' help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.') 'A higher number will use more memory. Defaults to 64.')
@ -55,5 +60,6 @@ class BlockchainEnv(Env):
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size, hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height, rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,
db_disable_integrity_checks=args.db_disable_integrity_checks
) )

View file

@ -138,7 +138,8 @@ class BlockchainProcessorService(BlockchainService):
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos, env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files, cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
executor=self._executor, index_address_status=env.index_address_status executor=self._executor, index_address_status=env.index_address_status,
enforce_integrity=not env.db_disable_integrity_checks
) )
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):