forked from LBRYCommunity/lbry-sdk
only save undo info for blocks within reorg limit
This commit is contained in:
parent
a7c45da10c
commit
ccf03fc07b
5 changed files with 24 additions and 8 deletions
|
@ -417,9 +417,14 @@ class BlockProcessor:
|
||||||
await self.prefetcher.reset_height(self.height)
|
await self.prefetcher.reset_height(self.height)
|
||||||
|
|
||||||
async def flush(self):
|
async def flush(self):
|
||||||
|
save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit
|
||||||
|
|
||||||
def flush():
|
def flush():
|
||||||
self.db.write_db_state()
|
self.db.write_db_state()
|
||||||
self.db.prefix_db.commit(self.height)
|
if save_undo:
|
||||||
|
self.db.prefix_db.commit(self.height)
|
||||||
|
else:
|
||||||
|
self.db.prefix_db.unsafe_commit()
|
||||||
self.clear_after_advance_or_reorg()
|
self.clear_after_advance_or_reorg()
|
||||||
self.db.assert_db_state()
|
self.db.assert_db_state()
|
||||||
await self.run_in_thread_with_lock(flush)
|
await self.run_in_thread_with_lock(flush)
|
||||||
|
|
|
@ -26,9 +26,10 @@ class KeyValueStorage:
|
||||||
class PrefixDB:
|
class PrefixDB:
|
||||||
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
||||||
|
|
||||||
def __init__(self, db: KeyValueStorage, unsafe_prefixes=None):
|
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
|
||||||
self._db = db
|
self._db = db
|
||||||
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
||||||
|
self._max_undo_depth = max_undo_depth
|
||||||
|
|
||||||
def unsafe_commit(self):
|
def unsafe_commit(self):
|
||||||
"""
|
"""
|
||||||
|
@ -52,6 +53,13 @@ class PrefixDB:
|
||||||
Write changes for a block height to the database and keep undo information so that the changes can be reverted
|
Write changes for a block height to the database and keep undo information so that the changes can be reverted
|
||||||
"""
|
"""
|
||||||
undo_ops = self._op_stack.get_undo_ops()
|
undo_ops = self._op_stack.get_undo_ops()
|
||||||
|
delete_undos = []
|
||||||
|
if height > self._max_undo_depth:
|
||||||
|
delete_undos.extend(self._db.iterator(
|
||||||
|
start=DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(0),
|
||||||
|
stop=DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height - self._max_undo_depth),
|
||||||
|
include_value=False
|
||||||
|
))
|
||||||
try:
|
try:
|
||||||
with self._db.write_batch(transaction=True) as batch:
|
with self._db.write_batch(transaction=True) as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
|
@ -61,6 +69,8 @@ class PrefixDB:
|
||||||
batch_put(staged_change.key, staged_change.value)
|
batch_put(staged_change.key, staged_change.value)
|
||||||
else:
|
else:
|
||||||
batch_delete(staged_change.key)
|
batch_delete(staged_change.key)
|
||||||
|
for undo_to_delete in delete_undos:
|
||||||
|
batch_delete(undo_to_delete)
|
||||||
batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height), undo_ops)
|
batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height), undo_ops)
|
||||||
finally:
|
finally:
|
||||||
self._op_stack.clear()
|
self._op_stack.clear()
|
||||||
|
|
|
@ -1570,10 +1570,10 @@ class LevelDBStore(KeyValueStorage):
|
||||||
|
|
||||||
|
|
||||||
class HubDB(PrefixDB):
|
class HubDB(PrefixDB):
|
||||||
def __init__(self, path: str, cache_mb: int, max_open_files: int = 512,
|
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
|
||||||
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
||||||
db = LevelDBStore(path, cache_mb, max_open_files)
|
db = LevelDBStore(path, cache_mb, max_open_files)
|
||||||
super().__init__(db, unsafe_prefixes=unsafe_prefixes)
|
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
|
||||||
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
||||||
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
||||||
self.claim_to_txo = ClaimToTXOPrefixRow(db, self._op_stack)
|
self.claim_to_txo = ClaimToTXOPrefixRow(db, self._op_stack)
|
||||||
|
|
|
@ -807,7 +807,8 @@ class LevelDB:
|
||||||
return
|
return
|
||||||
|
|
||||||
self.prefix_db = HubDB(
|
self.prefix_db = HubDB(
|
||||||
os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.cache_MB, max_open_files=512
|
os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.reorg_limit, self.env.cache_MB,
|
||||||
|
max_open_files=512
|
||||||
)
|
)
|
||||||
self.logger.info(f'opened db: lbry-leveldb')
|
self.logger.info(f'opened db: lbry-leveldb')
|
||||||
|
|
||||||
|
|
|
@ -32,9 +32,9 @@ class TestRevertableOpStack(unittest.TestCase):
|
||||||
key3 = ClaimToTXOPrefixRow.pack_key(b'\x03' * 20)
|
key3 = ClaimToTXOPrefixRow.pack_key(b'\x03' * 20)
|
||||||
key4 = ClaimToTXOPrefixRow.pack_key(b'\x04' * 20)
|
key4 = ClaimToTXOPrefixRow.pack_key(b'\x04' * 20)
|
||||||
|
|
||||||
val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'derp')
|
val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'derp')
|
||||||
val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'oops')
|
val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'oops')
|
||||||
val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'other')
|
val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, False, 'other')
|
||||||
|
|
||||||
# check that we can't delete a non existent value
|
# check that we can't delete a non existent value
|
||||||
with self.assertRaises(OpStackIntegrity):
|
with self.assertRaises(OpStackIntegrity):
|
||||||
|
|
Loading…
Reference in a new issue