only save undo info for blocks within reorg limit
This commit is contained in:
parent
9ec510c742
commit
cb5c39a159
5 changed files with 24 additions and 8 deletions
|
@ -417,9 +417,14 @@ class BlockProcessor:
|
|||
await self.prefetcher.reset_height(self.height)
|
||||
|
||||
async def flush(self):
|
||||
save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit
|
||||
|
||||
def flush():
|
||||
self.db.write_db_state()
|
||||
self.db.prefix_db.commit(self.height)
|
||||
if save_undo:
|
||||
self.db.prefix_db.commit(self.height)
|
||||
else:
|
||||
self.db.prefix_db.unsafe_commit()
|
||||
self.clear_after_advance_or_reorg()
|
||||
self.db.assert_db_state()
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
|
|
@ -26,9 +26,10 @@ class KeyValueStorage:
|
|||
class PrefixDB:
|
||||
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
||||
|
||||
def __init__(self, db: KeyValueStorage, unsafe_prefixes=None):
|
||||
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
|
||||
self._db = db
|
||||
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
||||
self._max_undo_depth = max_undo_depth
|
||||
|
||||
def unsafe_commit(self):
|
||||
"""
|
||||
|
@ -52,6 +53,13 @@ class PrefixDB:
|
|||
Write changes for a block height to the database and keep undo information so that the changes can be reverted
|
||||
"""
|
||||
undo_ops = self._op_stack.get_undo_ops()
|
||||
delete_undos = []
|
||||
if height > self._max_undo_depth:
|
||||
delete_undos.extend(self._db.iterator(
|
||||
start=DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(0),
|
||||
stop=DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height - self._max_undo_depth),
|
||||
include_value=False
|
||||
))
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
batch_put = batch.put
|
||||
|
@ -61,6 +69,8 @@ class PrefixDB:
|
|||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
for undo_to_delete in delete_undos:
|
||||
batch_delete(undo_to_delete)
|
||||
batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height), undo_ops)
|
||||
finally:
|
||||
self._op_stack.clear()
|
||||
|
|
|
@ -1570,10 +1570,10 @@ class LevelDBStore(KeyValueStorage):
|
|||
|
||||
|
||||
class HubDB(PrefixDB):
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int = 512,
|
||||
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
|
||||
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
||||
db = LevelDBStore(path, cache_mb, max_open_files)
|
||||
super().__init__(db, unsafe_prefixes=unsafe_prefixes)
|
||||
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
|
||||
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
||||
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
||||
self.claim_to_txo = ClaimToTXOPrefixRow(db, self._op_stack)
|
||||
|
|
|
@ -807,7 +807,8 @@ class LevelDB:
|
|||
return
|
||||
|
||||
self.prefix_db = HubDB(
|
||||
os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.cache_MB, max_open_files=512
|
||||
os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.reorg_limit, self.env.cache_MB,
|
||||
max_open_files=512
|
||||
)
|
||||
self.logger.info(f'opened db: lbry-leveldb')
|
||||
|
||||
|
|
|
@ -32,9 +32,9 @@ class TestRevertableOpStack(unittest.TestCase):
|
|||
key3 = ClaimToTXOPrefixRow.pack_key(b'\x03' * 20)
|
||||
key4 = ClaimToTXOPrefixRow.pack_key(b'\x04' * 20)
|
||||
|
||||
val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'derp')
|
||||
val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'oops')
|
||||
val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'other')
|
||||
val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'derp')
|
||||
val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'oops')
|
||||
val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'other')
|
||||
|
||||
# check that we can't delete a non existent value
|
||||
with self.assertRaises(OpStackIntegrity):
|
||||
|
|
Loading…
Reference in a new issue