move HubDB, delete leveldb.py

This commit is contained in:
Jack Robison 2022-01-06 12:28:15 -05:00
parent 24622103cf
commit 28be7d8993
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
8 changed files with 1381 additions and 1344 deletions

View file

@ -7,7 +7,7 @@ from lbry.error import ResolveCensoredError
from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage
from lbry.schema.types.v2.result_pb2 import Error as ErrorMessage from lbry.schema.types.v2.result_pb2 import Error as ErrorMessage
if TYPE_CHECKING: if TYPE_CHECKING:
from lbry.wallet.server.leveldb import ResolveResult from lbry.wallet.server.db.common import ResolveResult
INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID) INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID)
NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND) NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND)

View file

@ -12,7 +12,6 @@ from lbry.wallet.server.util import cachedproperty, subclasses
from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.daemon import Daemon, LBCDaemon from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
from lbry.wallet.server.block_processor import BlockProcessor from lbry.wallet.server.block_processor import BlockProcessor
@ -40,7 +39,6 @@ class Coin:
DAEMON = Daemon DAEMON = Daemon
BLOCK_PROCESSOR = BlockProcessor BLOCK_PROCESSOR = BlockProcessor
SESSION_MANAGER = LBRYSessionManager SESSION_MANAGER = LBRYSessionManager
DB = LevelDB
HEADER_VALUES = [ HEADER_VALUES = [
'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce' 'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce'
] ]
@ -243,7 +241,6 @@ class LBC(Coin):
SESSIONCLS = LBRYElectrumX SESSIONCLS = LBRYElectrumX
SESSION_MANAGER = LBRYSessionManager SESSION_MANAGER = LBRYSessionManager
DESERIALIZER = DeserializerSegWit DESERIALIZER = DeserializerSegWit
DB = LevelDB
NAME = "LBRY" NAME = "LBRY"
SHORTNAME = "LBC" SHORTNAME = "LBC"
NET = "mainnet" NET = "mainnet"

View file

@ -25,7 +25,7 @@ class DB_PREFIXES(enum.Enum):
reposted_claim = b'W' reposted_claim = b'W'
undo = b'M' undo = b'M'
claim_diff = b'Y' touched_or_deleted = b'Y'
tx = b'B' tx = b'B'
block_hash = b'C' block_hash = b'C'

View file

@ -1,4 +1,6 @@
import typing import typing
from typing import Optional
from lbry.error import ResolveCensoredError
CLAIM_TYPES = { CLAIM_TYPES = {
'stream': 1, 'stream': 1,
@ -451,3 +453,25 @@ class TrendingNotification(typing.NamedTuple):
height: int height: int
prev_amount: int prev_amount: int
new_amount: int new_amount: int
class UTXO(typing.NamedTuple):
tx_num: int
tx_pos: int
tx_hash: bytes
height: int
value: int
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, ResolveCensoredError, LookupError, ValueError]]
class ExpandedResolveResult(typing.NamedTuple):
stream: OptionalResolveResultOrError
channel: OptionalResolveResultOrError
repost: OptionalResolveResultOrError
reposted_channel: OptionalResolveResultOrError
class DBError(Exception):
"""Raised on general DB errors generally indicating corruption."""

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,227 @@
import struct
import rocksdb
from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
class RocksDBStore:
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
# Use snappy compression (the default)
self.path = path
self.secondary_path = secondary_path
self._max_open_files = max_open_files
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
# self.multi_get = self.db.multi_get
def get_options(self):
return rocksdb.Options(
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
max_open_files=self._max_open_files if not self.secondary_path else -1
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return RocksDBIterator(
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix if start is None and stop is None else None, include_key=include_key,
include_value=include_value
)
def write_batch(self, disable_wal: bool = False, sync: bool = False):
return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
def close(self):
self.db.close()
self.db = None
@property
def closed(self) -> bool:
return self.db is None
def try_catch_up_with_primary(self):
self.db.try_catch_up_with_primary()
class RocksDBWriteBatch:
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
self.batch = rocksdb.WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
class RocksDBIterator:
"""An iterator for RocksDB."""
__slots__ = [
'start',
'prefix',
'stop',
'iterator',
'include_key',
'include_value',
'prev_k',
'reverse',
'include_start',
'include_stop'
]
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bytes = None, stop: bytes = None,
include_key: bool = True, include_value: bool = True, reverse: bool = False,
include_start: bool = True, include_stop: bool = False):
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
self.start = start
self.prefix = prefix
self.stop = stop
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
if prefix is not None:
self.iterator.seek(prefix)
elif start is not None:
self.iterator.seek(start)
self.include_key = include_key
self.include_value = include_value
self.prev_k = None
self.reverse = reverse
self.include_start = include_start
self.include_stop = include_stop
def __iter__(self):
return self
def _check_stop_iteration(self, key: bytes):
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
raise StopIteration
elif self.start is not None and self.start > key[:len(self.start)]:
raise StopIteration
elif self.prefix is not None and not key.startswith(self.prefix):
raise StopIteration
def __next__(self):
if self.prev_k is not None:
self._check_stop_iteration(self.prev_k)
k, v = next(self.iterator)
self._check_stop_iteration(k)
self.prev_k = k
if self.include_key and self.include_value:
return k, v
elif self.include_key:
return k
return v
class PrefixDB:
"""
Base class for a revertable rocksdb database (a rocksdb db where each set of applied changes can be undone)
"""
UNDO_KEY_STRUCT = struct.Struct(b'>Q32s')
PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
self._db = db
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
self._max_undo_depth = max_undo_depth
def unsafe_commit(self):
"""
Write staged changes to the database without keeping undo information
Changes written cannot be undone
"""
try:
if not len(self._op_stack):
return
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
finally:
self._op_stack.clear()
def commit(self, height: int, block_hash: bytes):
"""
Write changes for a block height to the database and keep undo information so that the changes can be reverted
"""
undo_ops = self._op_stack.get_undo_ops()
delete_undos = []
if height > self._max_undo_depth:
delete_undos.extend(self._db.iterator(
start=DB_PREFIXES.undo.value + self.PARTIAL_UNDO_KEY_STRUCT.pack(0),
stop=DB_PREFIXES.undo.value + self.PARTIAL_UNDO_KEY_STRUCT.pack(height - self._max_undo_depth),
include_value=False
))
try:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
for undo_to_delete in delete_undos:
batch_delete(undo_to_delete)
batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height, block_hash), undo_ops)
finally:
self._op_stack.clear()
def rollback(self, height: int, block_hash: bytes):
"""
Revert changes for a block height
"""
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height, block_hash)
undo_info = self._db.get(undo_key)
self._op_stack.apply_packed_undo_ops(undo_info)
try:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
# batch_delete(undo_key)
finally:
self._op_stack.clear()
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self._db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self._db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def close(self):
if not self._db.closed:
self._db.close()
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
@property
def closed(self) -> bool:
return self._db.closed
def stage_raw_put(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertablePut(key, value))
def stage_raw_delete(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertableDelete(key, value))

View file

@ -4,7 +4,7 @@ import array
import base64 import base64
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB from lbry.wallet.server.db.interface import RocksDBStore, PrefixDB
from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db.common import TrendingNotification
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name from lbry.schema.url import normalize_name
@ -206,14 +206,14 @@ class TxHashValue(NamedTuple):
tx_hash: bytes tx_hash: bytes
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}(tx_hash={self.tx_hash.hex()})" return f"{self.__class__.__name__}(tx_hash={self.tx_hash[::-1].hex()})"
class TxNumKey(NamedTuple): class TxNumKey(NamedTuple):
tx_hash: bytes tx_hash: bytes
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}(tx_hash={self.tx_hash.hex()})" return f"{self.__class__.__name__}(tx_hash={self.tx_hash[::-1].hex()})"
class TxNumValue(NamedTuple): class TxNumValue(NamedTuple):
@ -224,7 +224,7 @@ class TxKey(NamedTuple):
tx_hash: bytes tx_hash: bytes
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}(tx_hash={self.tx_hash.hex()})" return f"{self.__class__.__name__}(tx_hash={self.tx_hash[::-1].hex()})"
class TxValue(NamedTuple): class TxValue(NamedTuple):
@ -1423,7 +1423,7 @@ class HashXHistoryPrefixRow(PrefixRow):
class TouchedOrDeletedPrefixRow(PrefixRow): class TouchedOrDeletedPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_diff.value prefix = DB_PREFIXES.touched_or_deleted.value
key_struct = struct.Struct(b'>L') key_struct = struct.Struct(b'>L')
value_struct = struct.Struct(b'>LL') value_struct = struct.Struct(b'>LL')
key_part_lambdas = [ key_part_lambdas = [
@ -1682,6 +1682,9 @@ class TouchedHashXKey(NamedTuple):
class TouchedHashXValue(NamedTuple): class TouchedHashXValue(NamedTuple):
touched_hashXs: typing.List[bytes] touched_hashXs: typing.List[bytes]
def __str__(self):
return f"{self.__class__.__name__}(touched_hashXs=[{', '.join(map(lambda x: x.hex(), self.touched_hashXs))}])"
class TouchedHashXPrefixRow(PrefixRow): class TouchedHashXPrefixRow(PrefixRow):
prefix = DB_PREFIXES.touched_hashX.value prefix = DB_PREFIXES.touched_hashX.value

File diff suppressed because it is too large Load diff