lbry-sdk/lbry/wallet/server/db/interface.py

145 lines
6.6 KiB
Python
Raw Normal View History

2022-01-06 18:28:15 +01:00
import struct
2022-01-15 19:45:41 +01:00
import typing
2022-01-06 18:28:15 +01:00
import rocksdb
from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
2022-01-31 21:48:04 +01:00
class BasePrefixDB:
2022-01-06 18:28:15 +01:00
"""
Base class for a revertable rocksdb database (a rocksdb db where each set of applied changes can be undone)
"""
UNDO_KEY_STRUCT = struct.Struct(b'>Q32s')
PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q')
2022-01-15 19:45:41 +01:00
def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None):
column_family_options = {
prefix.value: rocksdb.ColumnFamilyOptions() for prefix in DB_PREFIXES
2022-01-31 21:36:28 +01:00
}
2022-01-15 19:45:41 +01:00
self.column_families: typing.Dict[bytes, 'rocksdb.ColumnFamilyHandle'] = {}
self._db = rocksdb.DB(
path, rocksdb.Options(
2022-01-31 21:36:28 +01:00
create_if_missing=True, use_fsync=False, target_file_size_base=33554432,
max_open_files=max_open_files if not secondary_path else -1, create_missing_column_families=True
2022-01-15 19:45:41 +01:00
), secondary_name=secondary_path, column_families=column_family_options
)
for prefix in DB_PREFIXES:
cf = self._db.get_column_family(prefix.value)
if cf is None and not secondary_path:
self._db.create_column_family(prefix.value, rocksdb.ColumnFamilyOptions())
cf = self._db.get_column_family(prefix.value)
self.column_families[prefix.value] = cf
self._op_stack = RevertableOpStack(self.get, unsafe_prefixes=unsafe_prefixes)
2022-01-06 18:28:15 +01:00
self._max_undo_depth = max_undo_depth
def unsafe_commit(self):
"""
Write staged changes to the database without keeping undo information
Changes written cannot be undone
"""
try:
if not len(self._op_stack):
return
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
2022-01-15 19:45:41 +01:00
get_column_family = self.column_families.__getitem__
2022-01-06 18:28:15 +01:00
for staged_change in self._op_stack:
2022-01-15 19:45:41 +01:00
column_family = get_column_family(DB_PREFIXES(staged_change.key[:1]).value)
2022-01-06 18:28:15 +01:00
if staged_change.is_put:
2022-01-15 19:45:41 +01:00
batch_put((column_family, staged_change.key), staged_change.value)
2022-01-06 18:28:15 +01:00
else:
2022-01-15 19:45:41 +01:00
batch_delete((column_family, staged_change.key))
2022-01-06 18:28:15 +01:00
finally:
self._op_stack.clear()
def commit(self, height: int, block_hash: bytes):
"""
Write changes for a block height to the database and keep undo information so that the changes can be reverted
"""
undo_ops = self._op_stack.get_undo_ops()
delete_undos = []
if height > self._max_undo_depth:
delete_undos.extend(self._db.iterator(
start=DB_PREFIXES.undo.value + self.PARTIAL_UNDO_KEY_STRUCT.pack(0),
2022-01-15 19:45:41 +01:00
iterate_upper_bound=DB_PREFIXES.undo.value + self.PARTIAL_UNDO_KEY_STRUCT.pack(height - self._max_undo_depth),
2022-01-06 18:28:15 +01:00
include_value=False
))
try:
2022-01-15 19:45:41 +01:00
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
2022-01-06 18:28:15 +01:00
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
2022-01-15 19:45:41 +01:00
get_column_family = self.column_families.__getitem__
2022-01-06 18:28:15 +01:00
for staged_change in self._op_stack:
2022-01-15 19:45:41 +01:00
column_family = get_column_family(DB_PREFIXES(staged_change.key[:1]).value)
2022-01-06 18:28:15 +01:00
if staged_change.is_put:
2022-01-15 19:45:41 +01:00
batch_put((column_family, staged_change.key), staged_change.value)
2022-01-06 18:28:15 +01:00
else:
2022-01-15 19:45:41 +01:00
batch_delete((column_family, staged_change.key))
2022-01-06 18:28:15 +01:00
for undo_to_delete in delete_undos:
2022-01-15 19:45:41 +01:00
batch_delete((undo_c_f, undo_to_delete))
batch_put((undo_c_f, DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height, block_hash)), undo_ops)
2022-01-06 18:28:15 +01:00
finally:
self._op_stack.clear()
def rollback(self, height: int, block_hash: bytes):
"""
Revert changes for a block height
"""
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height, block_hash)
2022-01-15 19:45:41 +01:00
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
undo_info = self._db.get((undo_c_f, undo_key))
2022-01-06 18:28:15 +01:00
self._op_stack.apply_packed_undo_ops(undo_info)
try:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
2022-01-15 19:45:41 +01:00
get_column_family = self.column_families.__getitem__
2022-01-06 18:28:15 +01:00
for staged_change in self._op_stack:
2022-01-15 19:45:41 +01:00
column_family = get_column_family(DB_PREFIXES(staged_change.key[:1]).value)
2022-01-06 18:28:15 +01:00
if staged_change.is_put:
2022-01-15 19:45:41 +01:00
batch_put((column_family, staged_change.key), staged_change.value)
2022-01-06 18:28:15 +01:00
else:
2022-01-15 19:45:41 +01:00
batch_delete((column_family, staged_change.key))
2022-01-06 18:28:15 +01:00
# batch_delete(undo_key)
finally:
self._op_stack.clear()
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
2022-01-15 19:45:41 +01:00
cf = self.column_families[key[:1]]
return self._db.get((cf, key), fill_cache=fill_cache)
2022-01-06 18:28:15 +01:00
2022-01-15 19:45:41 +01:00
def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None,
iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None,
reverse: bool = False, include_key: bool = True, include_value: bool = True,
2022-01-25 05:17:07 +01:00
fill_cache: bool = True, prefix_same_as_start: bool = False, auto_prefix_mode: bool = False):
2022-01-06 18:28:15 +01:00
return self._db.iterator(
2022-01-15 19:45:41 +01:00
start=start, column_family=column_family, iterate_lower_bound=iterate_lower_bound,
iterate_upper_bound=iterate_upper_bound, reverse=reverse, include_key=include_key,
include_value=include_value, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
auto_prefix_mode=auto_prefix_mode
2022-01-06 18:28:15 +01:00
)
def close(self):
self._db.close()
2022-01-06 18:28:15 +01:00
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
@property
def closed(self) -> bool:
2022-01-15 19:45:41 +01:00
return self._db.is_closed
2022-01-06 18:28:15 +01:00
def stage_raw_put(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertablePut(key, value))
def stage_raw_delete(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertableDelete(key, value))
def estimate_num_keys(self, column_family: 'rocksdb.ColumnFamilyHandle' = None):
return int(self._db.get_property(b'rocksdb.estimate-num-keys', column_family).decode())