skip integrity errors for trending spikes

This commit is contained in:
Jack Robison 2021-08-30 12:22:53 -04:00 committed by Victor Shyba
parent 6f2b985b73
commit 0be141188c
2 changed files with 23 additions and 14 deletions

View file

@ -1,10 +1,12 @@
import struct import struct
import logging
from string import printable from string import printable
from collections import defaultdict from collections import defaultdict
from typing import Tuple, Iterable, Callable, Optional from typing import Tuple, Iterable, Callable, Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
_OP_STRUCT = struct.Struct('>BLL') _OP_STRUCT = struct.Struct('>BLL')
log = logging.getLogger()
class RevertableOp: class RevertableOp:
@ -80,9 +82,10 @@ class OpStackIntegrity(Exception):
class RevertableOpStack: class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]): def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
self._get = get_fn self._get = get_fn
self._items = defaultdict(list) self._items = defaultdict(list)
self._unsafe_prefixes = unsafe_prefixes or set()
def append_op(self, op: RevertableOp): def append_op(self, op: RevertableOp):
inverted = op.invert() inverted = op.invert()
@ -95,6 +98,7 @@ class RevertableOpStack:
has_stored_val = stored_val is not None has_stored_val = stored_val is not None
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
try:
if op.is_put and has_stored_val and not will_delete_existing_stored: if op.is_put and has_stored_val and not will_delete_existing_stored:
raise OpStackIntegrity( raise OpStackIntegrity(
f"db op tries to add on top of existing key without deleting first: {op}" f"db op tries to add on top of existing key without deleting first: {op}"
@ -107,6 +111,11 @@ class RevertableOpStack:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif op.is_delete and stored_val != op.value: elif op.is_delete and stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes:
log.error(f"skipping over integrity error: {err}")
else:
raise err
self._items[op.key].append(op) self._items[op.key].append(op)
def extend_ops(self, ops: Iterable[RevertableOp]): def extend_ops(self, ops: Iterable[RevertableOp]):

View file

@ -860,7 +860,7 @@ class LevelDB:
lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
) )
self.db_op_stack = RevertableOpStack(self.db.get) self.db_op_stack = RevertableOpStack(self.db.get, unsafe_prefixes={DB_PREFIXES.trending_spike.value})
self.prefix_db = PrefixDB(self.db, self.db_op_stack) self.prefix_db = PrefixDB(self.db, self.db_op_stack)
if is_new: if is_new: