diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 982ae3713..b10021d1e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -451,9 +451,7 @@ class BlockProcessor: signing_channel = self.db.get_claim_txo(signing_channel_hash) if signing_channel: - raw_channel_tx = self.db.db.get( - DB_PREFIXES.tx.value + self.db.total_transactions[signing_channel.tx_num] - ) + raw_channel_tx = self.db.prefix_db.tx.get(self.db.total_transactions[signing_channel.tx_num]).raw_tx channel_pub_key_bytes = None try: if not signing_channel: @@ -1189,20 +1187,16 @@ class BlockProcessor: add_claim_or_support = self._add_claim_or_support txs: List[Tuple[Tx, bytes]] = block.transactions - self.db_op_stack.extend_ops([ - RevertablePut(*Prefixes.block_hash.pack_item(height, self.coin.header_hash(block.header))), - RevertablePut(*Prefixes.header.pack_item(height, block.header)) - ]) + self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),)) + self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,)) for tx, tx_hash in txs: spent_claims = {} txos = Transaction(tx.raw).outputs - self.db_op_stack.extend_ops([ - RevertablePut(*Prefixes.tx.pack_item(tx_hash, tx.raw)), - RevertablePut(*Prefixes.tx_num.pack_item(tx_hash, tx_count)), - RevertablePut(*Prefixes.tx_hash.pack_item(tx_count, tx_hash)) - ]) + self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,)) + self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,)) + self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,)) # Spend the inputs for txin in tx.inputs: @@ -1211,7 +1205,6 @@ class BlockProcessor: # spend utxo for address histories hashX = spend_utxo(txin.prev_hash, txin.prev_idx) if hashX: - # self._set_hashX_cache(hashX) if tx_count not in self.hashXs_by_tx[hashX]: self.hashXs_by_tx[hashX].append(tx_count) # spend claim/support txo @@ -1439,7 +1432,7 @@ class BlockProcessor: self._caught_up_event = caught_up_event try: await self.db.open_dbs() - self.db_op_stack = RevertableOpStack(self.db.db.get) + self.db_op_stack = self.db.db_op_stack self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 8ed55e96f..10655138b 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -2,8 +2,10 @@ import typing import struct import array import base64 -from typing import Union, Tuple, NamedTuple +import plyvel +from typing import Union, Tuple, NamedTuple, Optional from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.schema.url import normalize_name ACTIVATED_CLAIM_TXO_TYPE = 1 @@ -19,14 +21,14 @@ def length_prefix(key: str) -> bytes: return len(key).to_bytes(1, byteorder='big') + key.encode() -_ROW_TYPES = {} +ROW_TYPES = {} class PrefixRowType(type): def __new__(cls, name, bases, kwargs): klass = super().__new__(cls, name, bases, kwargs) if name != "PrefixRow": - _ROW_TYPES[klass.prefix] = klass + ROW_TYPES[klass.prefix] = klass return klass @@ -36,6 +38,42 @@ class PrefixRow(metaclass=PrefixRowType): value_struct: struct.Struct key_part_lambdas = [] + def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack): + self._db = db + self._op_stack = op_stack + + def iterate(self, prefix=None, start=None, stop=None, + reverse: bool = False, include_key: bool = True, include_value: bool = True): + if prefix is not None: + prefix = self.pack_partial_key(*prefix) + if start is not None: + start = self.pack_partial_key(*start) + if stop is not None: + stop = self.pack_partial_key(*stop) + + if include_key and include_value: + for k, v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse): + yield self.unpack_key(k), self.unpack_value(v) + elif include_key: + for k in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_value=False): + yield self.unpack_key(k) + elif include_value: + for v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False): + yield self.unpack_value(v) + else: + raise RuntimeError + + def get(self, *key_args): + v = self._db.get(self.pack_key(*key_args)) + if v: + return self.unpack_value(v) + + def stage_put(self, key_args=(), value_args=()): + self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))) + + def stage_delete(self, key_args=(), value_args=()): + self._op_stack.append_op(RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args))) + @classmethod def pack_partial_key(cls, *args) -> bytes: return cls.prefix + cls.key_part_lambdas[len(args)](*args) @@ -1333,38 +1371,55 @@ class Prefixes: touched_or_deleted = TouchedOrDeletedPrefixRow +class PrefixDB: + def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack): + self._db = db + self._op_stack = op_stack -ROW_TYPES = { - Prefixes.claim_to_support.prefix: Prefixes.claim_to_support, - Prefixes.support_to_claim.prefix: Prefixes.support_to_claim, - Prefixes.claim_to_txo.prefix: Prefixes.claim_to_txo, - Prefixes.txo_to_claim.prefix: Prefixes.txo_to_claim, - Prefixes.claim_to_channel.prefix: Prefixes.claim_to_channel, - Prefixes.channel_to_claim.prefix: Prefixes.channel_to_claim, - Prefixes.claim_short_id.prefix: Prefixes.claim_short_id, - Prefixes.claim_expiration.prefix: Prefixes.claim_expiration, - Prefixes.claim_takeover.prefix: Prefixes.claim_takeover, - Prefixes.pending_activation.prefix: Prefixes.pending_activation, - Prefixes.activated.prefix: Prefixes.activated, - Prefixes.active_amount.prefix: Prefixes.active_amount, - Prefixes.effective_amount.prefix: Prefixes.effective_amount, - Prefixes.repost.prefix: Prefixes.repost, - Prefixes.reposted_claim.prefix: Prefixes.reposted_claim, - Prefixes.undo.prefix: Prefixes.undo, - Prefixes.utxo.prefix: Prefixes.utxo, - Prefixes.hashX_utxo.prefix: Prefixes.hashX_utxo, - Prefixes.hashX_history.prefix: Prefixes.hashX_history, - Prefixes.block_hash.prefix: Prefixes.block_hash, - Prefixes.tx_count.prefix: Prefixes.tx_count, - Prefixes.tx_hash.prefix: Prefixes.tx_hash, - Prefixes.tx_num.prefix: Prefixes.tx_num, - Prefixes.tx.prefix: Prefixes.tx, - Prefixes.header.prefix: Prefixes.header -} + self.claim_to_support = ClaimToSupportPrefixRow(db, op_stack) + self.support_to_claim = SupportToClaimPrefixRow(db, op_stack) + self.claim_to_txo = ClaimToTXOPrefixRow(db, op_stack) + self.txo_to_claim = TXOToClaimPrefixRow(db, op_stack) + self.claim_to_channel = ClaimToChannelPrefixRow(db, op_stack) + self.channel_to_claim = ChannelToClaimPrefixRow(db, op_stack) + self.claim_short_id = ClaimShortIDPrefixRow(db, op_stack) + self.claim_expiration = ClaimExpirationPrefixRow(db, op_stack) + self.claim_takeover = ClaimTakeoverPrefixRow(db, op_stack) + self.pending_activation = PendingActivationPrefixRow(db, op_stack) + self.activated = ActivatedPrefixRow(db, op_stack) + self.active_amount = ActiveAmountPrefixRow(db, op_stack) + self.effective_amount = EffectiveAmountPrefixRow(db, op_stack) + self.repost = RepostPrefixRow(db, op_stack) + self.reposted_claim = RepostedPrefixRow(db, op_stack) + self.undo = UndoPrefixRow(db, op_stack) + self.utxo = UTXOPrefixRow(db, op_stack) + self.hashX_utxo = HashXUTXOPrefixRow(db, op_stack) + self.hashX_history = HashXHistoryPrefixRow(db, op_stack) + self.block_hash = BlockHashPrefixRow(db, op_stack) + self.tx_count = TxCountPrefixRow(db, op_stack) + self.tx_hash = TXHashPrefixRow(db, op_stack) + self.tx_num = TXNumPrefixRow(db, op_stack) + self.tx = TXPrefixRow(db, op_stack) + self.header = BlockHeaderPrefixRow(db, op_stack) + self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack) + + def commit(self): + try: + with self._db.write_batch(transaction=True) as batch: + batch_put = batch.put + batch_delete = batch.delete + + for staged_change in self._op_stack: + if staged_change.is_put: + batch_put(staged_change.key, staged_change.value) + else: + batch_delete(staged_change.key) + finally: + self._op_stack.clear() def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: try: - return _ROW_TYPES[key[:1]].unpack_item(key, value) + return ROW_TYPES[key[:1]].unpack_item(key, value) except KeyError: return key, value diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 88c6e8bae..40049732b 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -34,7 +34,8 @@ from lbry.wallet.server.tx import TxInput from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES -from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue +from lbry.wallet.server.db.revertable import RevertableOpStack +from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue from lbry.wallet.transaction import OutputScript @@ -111,6 +112,7 @@ class LevelDB: self.logger.info(f'switching current directory to {env.db_dir}') self.db = None + self.prefix_db = None self.hist_unflushed = defaultdict(partial(array.array, 'I')) self.hist_unflushed_count = 0 @@ -415,30 +417,25 @@ class LevelDB: return Prefixes.block_hash.unpack_value(v).block_hash def get_support_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]: - v = self.db.get(Prefixes.claim_to_support.pack_key(claim_hash, tx_num, position)) - if v: - return Prefixes.claim_to_support.unpack_value(v).amount + v = self.prefix_db.claim_to_support.get(claim_hash, tx_num, position) + return None if not v else v.amount def get_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: assert claim_hash - v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash)) - if v: - return Prefixes.claim_to_txo.unpack_value(v) + return self.prefix_db.claim_to_txo.get(claim_hash) def _get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int: return sum( - Prefixes.active_amount.unpack_value(v).amount - for v in self.db.iterator(start=Prefixes.active_amount.pack_partial_key( - claim_hash, txo_type, 0), stop=Prefixes.active_amount.pack_partial_key( - claim_hash, txo_type, height), include_key=False) + v.amount for v in self.prefix_db.active_amount.iterate( + start=(claim_hash, txo_type, 0), stop=(claim_hash, txo_type, height), include_key=False + ) ) def get_active_amount_as_of_height(self, claim_hash: bytes, height: int) -> int: - for v in self.db.iterator( - start=Prefixes.active_amount.pack_partial_key(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), - stop=Prefixes.active_amount.pack_partial_key(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height), + for v in self.prefix_db.active_amount.iterate( + start=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), stop=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height), include_key=False, reverse=True): - return Prefixes.active_amount.unpack_value(v).amount + return v.amount return 0 def get_effective_amount(self, claim_hash: bytes, support_only=False) -> int: @@ -448,14 +445,13 @@ class LevelDB: return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) def get_url_effective_amount(self, name: str, claim_hash: bytes): - for _k, _v in self.db.iterator(prefix=Prefixes.effective_amount.pack_partial_key(name)): - v = Prefixes.effective_amount.unpack_value(_v) + for k, v in self.prefix_db.effective_amount.iterate(prefix=(name,)): if v.claim_hash == claim_hash: - return Prefixes.effective_amount.unpack_key(_k) + return k def get_claims_for_name(self, name): claims = [] - prefix = Prefixes.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big') + prefix = Prefixes.claim_short_id.pack_partial_key(name) + bytes([1]) for _k, _v in self.db.iterator(prefix=prefix): v = Prefixes.claim_short_id.unpack_value(_v) claim_hash = self.get_claim_from_txo(v.tx_num, v.position).claim_hash @@ -465,7 +461,7 @@ class LevelDB: def get_claims_in_channel_count(self, channel_hash) -> int: count = 0 - for _ in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash), include_key=False): + for _ in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash,), include_key=False): count += 1 return count @@ -853,6 +849,8 @@ class LevelDB: lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 ) + self.db_op_stack = RevertableOpStack(self.db.get) + self.prefix_db = PrefixDB(self.db, self.db_op_stack) if is_new: self.logger.info('created new db: %s', f'lbry-leveldb')