skip integrity errors for trending spikes
This commit is contained in:
parent
34576e880d
commit
1940301824
2 changed files with 23 additions and 14 deletions
|
@ -1,10 +1,12 @@
|
||||||
import struct
|
import struct
|
||||||
|
import logging
|
||||||
from string import printable
|
from string import printable
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import Tuple, Iterable, Callable, Optional
|
from typing import Tuple, Iterable, Callable, Optional
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
|
|
||||||
_OP_STRUCT = struct.Struct('>BLL')
|
_OP_STRUCT = struct.Struct('>BLL')
|
||||||
|
log = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
class RevertableOp:
|
class RevertableOp:
|
||||||
|
@ -80,9 +82,10 @@ class OpStackIntegrity(Exception):
|
||||||
|
|
||||||
|
|
||||||
class RevertableOpStack:
|
class RevertableOpStack:
|
||||||
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]):
|
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
|
||||||
self._get = get_fn
|
self._get = get_fn
|
||||||
self._items = defaultdict(list)
|
self._items = defaultdict(list)
|
||||||
|
self._unsafe_prefixes = unsafe_prefixes or set()
|
||||||
|
|
||||||
def append_op(self, op: RevertableOp):
|
def append_op(self, op: RevertableOp):
|
||||||
inverted = op.invert()
|
inverted = op.invert()
|
||||||
|
@ -95,18 +98,24 @@ class RevertableOpStack:
|
||||||
has_stored_val = stored_val is not None
|
has_stored_val = stored_val is not None
|
||||||
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
|
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
|
||||||
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
|
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
|
||||||
if op.is_put and has_stored_val and not will_delete_existing_stored:
|
try:
|
||||||
raise OpStackIntegrity(
|
if op.is_put and has_stored_val and not will_delete_existing_stored:
|
||||||
f"db op tries to add on top of existing key without deleting first: {op}"
|
raise OpStackIntegrity(
|
||||||
)
|
f"db op tries to add on top of existing key without deleting first: {op}"
|
||||||
elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
|
)
|
||||||
# there is a value and we're not deleting it in this op
|
elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
|
||||||
# check that a delete for the stored value is in the stack
|
# there is a value and we're not deleting it in this op
|
||||||
raise OpStackIntegrity(f"delete {op}")
|
# check that a delete for the stored value is in the stack
|
||||||
elif op.is_delete and not has_stored_val:
|
raise OpStackIntegrity(f"delete {op}")
|
||||||
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
|
elif op.is_delete and not has_stored_val:
|
||||||
elif op.is_delete and stored_val != op.value:
|
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
|
||||||
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
|
elif op.is_delete and stored_val != op.value:
|
||||||
|
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
|
||||||
|
except OpStackIntegrity as err:
|
||||||
|
if op.key[:1] in self._unsafe_prefixes:
|
||||||
|
log.error(f"skipping over integrity error: {err}")
|
||||||
|
else:
|
||||||
|
raise err
|
||||||
self._items[op.key].append(op)
|
self._items[op.key].append(op)
|
||||||
|
|
||||||
def extend_ops(self, ops: Iterable[RevertableOp]):
|
def extend_ops(self, ops: Iterable[RevertableOp]):
|
||||||
|
|
|
@ -860,7 +860,7 @@ class LevelDB:
|
||||||
lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
|
lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
|
||||||
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
|
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
|
||||||
)
|
)
|
||||||
self.db_op_stack = RevertableOpStack(self.db.get)
|
self.db_op_stack = RevertableOpStack(self.db.get, unsafe_prefixes={DB_PREFIXES.trending_spike.value})
|
||||||
self.prefix_db = PrefixDB(self.db, self.db_op_stack)
|
self.prefix_db = PrefixDB(self.db, self.db_op_stack)
|
||||||
|
|
||||||
if is_new:
|
if is_new:
|
||||||
|
|
Loading…
Reference in a new issue