diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 7365b8dbe..e82c36f12 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -1,10 +1,12 @@ import struct +import logging from string import printable from collections import defaultdict from typing import Tuple, Iterable, Callable, Optional from lbry.wallet.server.db import DB_PREFIXES _OP_STRUCT = struct.Struct('>BLL') +log = logging.getLogger() class RevertableOp: @@ -80,9 +82,10 @@ class OpStackIntegrity(Exception): class RevertableOpStack: - def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]): + def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None): self._get = get_fn self._items = defaultdict(list) + self._unsafe_prefixes = unsafe_prefixes or set() def append_op(self, op: RevertableOp): inverted = op.invert() @@ -95,18 +98,24 @@ class RevertableOpStack: has_stored_val = stored_val is not None delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) - if op.is_put and has_stored_val and not will_delete_existing_stored: - raise OpStackIntegrity( - f"db op tries to add on top of existing key without deleting first: {op}" - ) - elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: - # there is a value and we're not deleting it in this op - # check that a delete for the stored value is in the stack - raise OpStackIntegrity(f"delete {op}") - elif op.is_delete and not has_stored_val: - raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") - elif op.is_delete and stored_val != op.value: - raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") + try: + if op.is_put and has_stored_val and not will_delete_existing_stored: + raise OpStackIntegrity( + f"db op tries to add on top of existing key without deleting first: {op}" + ) + elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: + # there is a value and we're not deleting it in this op + # check that a delete for the stored value is in the stack + raise OpStackIntegrity(f"delete {op}") + elif op.is_delete and not has_stored_val: + raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") + elif op.is_delete and stored_val != op.value: + raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") + except OpStackIntegrity as err: + if op.key[:1] in self._unsafe_prefixes: + log.error(f"skipping over integrity error: {err}") + else: + raise err self._items[op.key].append(op) def extend_ops(self, ops: Iterable[RevertableOp]): diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index b21383156..18f70f39e 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -860,7 +860,7 @@ class LevelDB: lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 ) - self.db_op_stack = RevertableOpStack(self.db.get) + self.db_op_stack = RevertableOpStack(self.db.get, unsafe_prefixes={DB_PREFIXES.trending_spike.value}) self.prefix_db = PrefixDB(self.db, self.db_op_stack) if is_new: