diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 0b57eedf3..370bc5f1e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -417,9 +417,14 @@ class BlockProcessor: await self.prefetcher.reset_height(self.height) async def flush(self): + save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit + def flush(): self.db.write_db_state() - self.db.prefix_db.commit(self.height) + if save_undo: + self.db.prefix_db.commit(self.height) + else: + self.db.prefix_db.unsafe_commit() self.clear_after_advance_or_reorg() self.db.assert_db_state() await self.run_in_thread_with_lock(flush) diff --git a/lbry/wallet/server/db/db.py b/lbry/wallet/server/db/db.py index 3d945bb48..6d613df93 100644 --- a/lbry/wallet/server/db/db.py +++ b/lbry/wallet/server/db/db.py @@ -26,9 +26,10 @@ class KeyValueStorage: class PrefixDB: UNDO_KEY_STRUCT = struct.Struct(b'>Q') - def __init__(self, db: KeyValueStorage, unsafe_prefixes=None): + def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None): self._db = db self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes) + self._max_undo_depth = max_undo_depth def unsafe_commit(self): """ @@ -52,6 +53,13 @@ class PrefixDB: Write changes for a block height to the database and keep undo information so that the changes can be reverted """ undo_ops = self._op_stack.get_undo_ops() + delete_undos = [] + if height > self._max_undo_depth: + delete_undos.extend(self._db.iterator( + start=DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(0), + stop=DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height - self._max_undo_depth), + include_value=False + )) try: with self._db.write_batch(transaction=True) as batch: batch_put = batch.put @@ -61,6 +69,8 @@ class PrefixDB: batch_put(staged_change.key, staged_change.value) else: batch_delete(staged_change.key) + for undo_to_delete in delete_undos: + batch_delete(undo_to_delete) batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height), undo_ops) finally: self._op_stack.clear() diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index ec401d6cf..c50f56692 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -1570,10 +1570,10 @@ class LevelDBStore(KeyValueStorage): class HubDB(PrefixDB): - def __init__(self, path: str, cache_mb: int, max_open_files: int = 512, + def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, unsafe_prefixes: Optional[typing.Set[bytes]] = None): db = LevelDBStore(path, cache_mb, max_open_files) - super().__init__(db, unsafe_prefixes=unsafe_prefixes) + super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes) self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) self.claim_to_txo = ClaimToTXOPrefixRow(db, self._op_stack) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 093254ea8..70245bb14 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -807,7 +807,8 @@ class LevelDB: return self.prefix_db = HubDB( - os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.cache_MB, max_open_files=512 + os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.reorg_limit, self.env.cache_MB, + max_open_files=512 ) self.logger.info(f'opened db: lbry-leveldb') diff --git a/tests/unit/wallet/server/test_revertable.py b/tests/unit/wallet/server/test_revertable.py index 42318b53b..f5729689a 100644 --- a/tests/unit/wallet/server/test_revertable.py +++ b/tests/unit/wallet/server/test_revertable.py @@ -32,9 +32,9 @@ class TestRevertableOpStack(unittest.TestCase): key3 = ClaimToTXOPrefixRow.pack_key(b'\x03' * 20) key4 = ClaimToTXOPrefixRow.pack_key(b'\x04' * 20) - val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'derp') - val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'oops') - val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'other') + val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'derp') + val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'oops') + val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'other') # check that we can't delete a non existent value with self.assertRaises(OpStackIntegrity):