From 1940301824f26ac1c9ec8efaf7979a8e6e1afb97 Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Mon, 30 Aug 2021 12:22:53 -0400
Subject: [PATCH] skip integrity errors for trending spikes

---
 lbry/wallet/server/db/revertable.py | 35 ++++++++++++++++++-----------
 lbry/wallet/server/leveldb.py       |  2 +-
 2 files changed, 23 insertions(+), 14 deletions(-)

diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py
index 7365b8dbe..e82c36f12 100644
--- a/lbry/wallet/server/db/revertable.py
+++ b/lbry/wallet/server/db/revertable.py
@@ -1,10 +1,12 @@
 import struct
+import logging
 from string import printable
 from collections import defaultdict
 from typing import Tuple, Iterable, Callable, Optional
 from lbry.wallet.server.db import DB_PREFIXES
 
 _OP_STRUCT = struct.Struct('>BLL')
+log = logging.getLogger()
 
 
 class RevertableOp:
@@ -80,9 +82,10 @@ class OpStackIntegrity(Exception):
 
 
 class RevertableOpStack:
-    def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]):
+    def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
         self._get = get_fn
         self._items = defaultdict(list)
+        self._unsafe_prefixes = unsafe_prefixes or set()
 
     def append_op(self, op: RevertableOp):
         inverted = op.invert()
@@ -95,18 +98,24 @@ class RevertableOpStack:
         has_stored_val = stored_val is not None
         delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
         will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
-        if op.is_put and has_stored_val and not will_delete_existing_stored:
-            raise OpStackIntegrity(
-                f"db op tries to add on top of existing key without deleting first: {op}"
-            )
-        elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
-            # there is a value and we're not deleting it in this op
-            # check that a delete for the stored value is in the stack
-            raise OpStackIntegrity(f"delete {op}")
-        elif op.is_delete and not has_stored_val:
-            raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
-        elif op.is_delete and stored_val != op.value:
-            raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
+        try:
+            if op.is_put and has_stored_val and not will_delete_existing_stored:
+                raise OpStackIntegrity(
+                    f"db op tries to add on top of existing key without deleting first: {op}"
+                )
+            elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
+                # there is a value and we're not deleting it in this op
+                # check that a delete for the stored value is in the stack
+                raise OpStackIntegrity(f"delete {op}")
+            elif op.is_delete and not has_stored_val:
+                raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
+            elif op.is_delete and stored_val != op.value:
+                raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
+        except OpStackIntegrity as err:
+            if op.key[:1] in self._unsafe_prefixes:
+                log.error(f"skipping over integrity error: {err}")
+            else:
+                raise err
         self._items[op.key].append(op)
 
     def extend_ops(self, ops: Iterable[RevertableOp]):
diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py
index b21383156..18f70f39e 100644
--- a/lbry/wallet/server/leveldb.py
+++ b/lbry/wallet/server/leveldb.py
@@ -860,7 +860,7 @@ class LevelDB:
             lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
             max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
         )
-        self.db_op_stack = RevertableOpStack(self.db.get)
+        self.db_op_stack = RevertableOpStack(self.db.get, unsafe_prefixes={DB_PREFIXES.trending_spike.value})
         self.prefix_db = PrefixDB(self.db, self.db_op_stack)
 
         if is_new: