prefix db

This commit is contained in:
Jack Robison 2021-08-13 20:02:42 -04:00 committed by Victor Shyba
parent 0c7be8975f
commit e212ce23e3
3 changed files with 111 additions and 65 deletions

View file

@ -451,9 +451,7 @@ class BlockProcessor:
signing_channel = self.db.get_claim_txo(signing_channel_hash) signing_channel = self.db.get_claim_txo(signing_channel_hash)
if signing_channel: if signing_channel:
raw_channel_tx = self.db.db.get( raw_channel_tx = self.db.prefix_db.tx.get(self.db.total_transactions[signing_channel.tx_num]).raw_tx
DB_PREFIXES.tx.value + self.db.total_transactions[signing_channel.tx_num]
)
channel_pub_key_bytes = None channel_pub_key_bytes = None
try: try:
if not signing_channel: if not signing_channel:
@ -1189,20 +1187,16 @@ class BlockProcessor:
add_claim_or_support = self._add_claim_or_support add_claim_or_support = self._add_claim_or_support
txs: List[Tuple[Tx, bytes]] = block.transactions txs: List[Tuple[Tx, bytes]] = block.transactions
self.db_op_stack.extend_ops([ self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
RevertablePut(*Prefixes.block_hash.pack_item(height, self.coin.header_hash(block.header))), self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
RevertablePut(*Prefixes.header.pack_item(height, block.header))
])
for tx, tx_hash in txs: for tx, tx_hash in txs:
spent_claims = {} spent_claims = {}
txos = Transaction(tx.raw).outputs txos = Transaction(tx.raw).outputs
self.db_op_stack.extend_ops([ self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,))
RevertablePut(*Prefixes.tx.pack_item(tx_hash, tx.raw)), self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,))
RevertablePut(*Prefixes.tx_num.pack_item(tx_hash, tx_count)), self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,))
RevertablePut(*Prefixes.tx_hash.pack_item(tx_count, tx_hash))
])
# Spend the inputs # Spend the inputs
for txin in tx.inputs: for txin in tx.inputs:
@ -1211,7 +1205,6 @@ class BlockProcessor:
# spend utxo for address histories # spend utxo for address histories
hashX = spend_utxo(txin.prev_hash, txin.prev_idx) hashX = spend_utxo(txin.prev_hash, txin.prev_idx)
if hashX: if hashX:
# self._set_hashX_cache(hashX)
if tx_count not in self.hashXs_by_tx[hashX]: if tx_count not in self.hashXs_by_tx[hashX]:
self.hashXs_by_tx[hashX].append(tx_count) self.hashXs_by_tx[hashX].append(tx_count)
# spend claim/support txo # spend claim/support txo
@ -1439,7 +1432,7 @@ class BlockProcessor:
self._caught_up_event = caught_up_event self._caught_up_event = caught_up_event
try: try:
await self.db.open_dbs() await self.db.open_dbs()
self.db_op_stack = RevertableOpStack(self.db.db.get) self.db_op_stack = self.db.db_op_stack
self.height = self.db.db_height self.height = self.db.db_height
self.tip = self.db.db_tip self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count self.tx_count = self.db.db_tx_count

View file

@ -2,8 +2,10 @@ import typing
import struct import struct
import array import array
import base64 import base64
from typing import Union, Tuple, NamedTuple import plyvel
from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name from lbry.schema.url import normalize_name
ACTIVATED_CLAIM_TXO_TYPE = 1 ACTIVATED_CLAIM_TXO_TYPE = 1
@ -19,14 +21,14 @@ def length_prefix(key: str) -> bytes:
return len(key).to_bytes(1, byteorder='big') + key.encode() return len(key).to_bytes(1, byteorder='big') + key.encode()
_ROW_TYPES = {} ROW_TYPES = {}
class PrefixRowType(type): class PrefixRowType(type):
def __new__(cls, name, bases, kwargs): def __new__(cls, name, bases, kwargs):
klass = super().__new__(cls, name, bases, kwargs) klass = super().__new__(cls, name, bases, kwargs)
if name != "PrefixRow": if name != "PrefixRow":
_ROW_TYPES[klass.prefix] = klass ROW_TYPES[klass.prefix] = klass
return klass return klass
@ -36,6 +38,42 @@ class PrefixRow(metaclass=PrefixRowType):
value_struct: struct.Struct value_struct: struct.Struct
key_part_lambdas = [] key_part_lambdas = []
def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack):
self._db = db
self._op_stack = op_stack
def iterate(self, prefix=None, start=None, stop=None,
reverse: bool = False, include_key: bool = True, include_value: bool = True):
if prefix is not None:
prefix = self.pack_partial_key(*prefix)
if start is not None:
start = self.pack_partial_key(*start)
if stop is not None:
stop = self.pack_partial_key(*stop)
if include_key and include_value:
for k, v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse):
yield self.unpack_key(k), self.unpack_value(v)
elif include_key:
for k in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_value=False):
yield self.unpack_key(k)
elif include_value:
for v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False):
yield self.unpack_value(v)
else:
raise RuntimeError
def get(self, *key_args):
v = self._db.get(self.pack_key(*key_args))
if v:
return self.unpack_value(v)
def stage_put(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args)))
def stage_delete(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args)))
@classmethod @classmethod
def pack_partial_key(cls, *args) -> bytes: def pack_partial_key(cls, *args) -> bytes:
return cls.prefix + cls.key_part_lambdas[len(args)](*args) return cls.prefix + cls.key_part_lambdas[len(args)](*args)
@ -1333,38 +1371,55 @@ class Prefixes:
touched_or_deleted = TouchedOrDeletedPrefixRow touched_or_deleted = TouchedOrDeletedPrefixRow
class PrefixDB:
def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack):
self._db = db
self._op_stack = op_stack
ROW_TYPES = { self.claim_to_support = ClaimToSupportPrefixRow(db, op_stack)
Prefixes.claim_to_support.prefix: Prefixes.claim_to_support, self.support_to_claim = SupportToClaimPrefixRow(db, op_stack)
Prefixes.support_to_claim.prefix: Prefixes.support_to_claim, self.claim_to_txo = ClaimToTXOPrefixRow(db, op_stack)
Prefixes.claim_to_txo.prefix: Prefixes.claim_to_txo, self.txo_to_claim = TXOToClaimPrefixRow(db, op_stack)
Prefixes.txo_to_claim.prefix: Prefixes.txo_to_claim, self.claim_to_channel = ClaimToChannelPrefixRow(db, op_stack)
Prefixes.claim_to_channel.prefix: Prefixes.claim_to_channel, self.channel_to_claim = ChannelToClaimPrefixRow(db, op_stack)
Prefixes.channel_to_claim.prefix: Prefixes.channel_to_claim, self.claim_short_id = ClaimShortIDPrefixRow(db, op_stack)
Prefixes.claim_short_id.prefix: Prefixes.claim_short_id, self.claim_expiration = ClaimExpirationPrefixRow(db, op_stack)
Prefixes.claim_expiration.prefix: Prefixes.claim_expiration, self.claim_takeover = ClaimTakeoverPrefixRow(db, op_stack)
Prefixes.claim_takeover.prefix: Prefixes.claim_takeover, self.pending_activation = PendingActivationPrefixRow(db, op_stack)
Prefixes.pending_activation.prefix: Prefixes.pending_activation, self.activated = ActivatedPrefixRow(db, op_stack)
Prefixes.activated.prefix: Prefixes.activated, self.active_amount = ActiveAmountPrefixRow(db, op_stack)
Prefixes.active_amount.prefix: Prefixes.active_amount, self.effective_amount = EffectiveAmountPrefixRow(db, op_stack)
Prefixes.effective_amount.prefix: Prefixes.effective_amount, self.repost = RepostPrefixRow(db, op_stack)
Prefixes.repost.prefix: Prefixes.repost, self.reposted_claim = RepostedPrefixRow(db, op_stack)
Prefixes.reposted_claim.prefix: Prefixes.reposted_claim, self.undo = UndoPrefixRow(db, op_stack)
Prefixes.undo.prefix: Prefixes.undo, self.utxo = UTXOPrefixRow(db, op_stack)
Prefixes.utxo.prefix: Prefixes.utxo, self.hashX_utxo = HashXUTXOPrefixRow(db, op_stack)
Prefixes.hashX_utxo.prefix: Prefixes.hashX_utxo, self.hashX_history = HashXHistoryPrefixRow(db, op_stack)
Prefixes.hashX_history.prefix: Prefixes.hashX_history, self.block_hash = BlockHashPrefixRow(db, op_stack)
Prefixes.block_hash.prefix: Prefixes.block_hash, self.tx_count = TxCountPrefixRow(db, op_stack)
Prefixes.tx_count.prefix: Prefixes.tx_count, self.tx_hash = TXHashPrefixRow(db, op_stack)
Prefixes.tx_hash.prefix: Prefixes.tx_hash, self.tx_num = TXNumPrefixRow(db, op_stack)
Prefixes.tx_num.prefix: Prefixes.tx_num, self.tx = TXPrefixRow(db, op_stack)
Prefixes.tx.prefix: Prefixes.tx, self.header = BlockHeaderPrefixRow(db, op_stack)
Prefixes.header.prefix: Prefixes.header self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack)
}
def commit(self):
try:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
finally:
self._op_stack.clear()
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
try: try:
return _ROW_TYPES[key[:1]].unpack_item(key, value) return ROW_TYPES[key[:1]].unpack_item(key, value)
except KeyError: except KeyError:
return key, value return key, value

View file

@ -34,7 +34,8 @@ from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.revertable import RevertableOpStack
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
from lbry.wallet.transaction import OutputScript from lbry.wallet.transaction import OutputScript
@ -111,6 +112,7 @@ class LevelDB:
self.logger.info(f'switching current directory to {env.db_dir}') self.logger.info(f'switching current directory to {env.db_dir}')
self.db = None self.db = None
self.prefix_db = None
self.hist_unflushed = defaultdict(partial(array.array, 'I')) self.hist_unflushed = defaultdict(partial(array.array, 'I'))
self.hist_unflushed_count = 0 self.hist_unflushed_count = 0
@ -415,30 +417,25 @@ class LevelDB:
return Prefixes.block_hash.unpack_value(v).block_hash return Prefixes.block_hash.unpack_value(v).block_hash
def get_support_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]: def get_support_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]:
v = self.db.get(Prefixes.claim_to_support.pack_key(claim_hash, tx_num, position)) v = self.prefix_db.claim_to_support.get(claim_hash, tx_num, position)
if v: return None if not v else v.amount
return Prefixes.claim_to_support.unpack_value(v).amount
def get_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: def get_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
assert claim_hash assert claim_hash
v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash)) return self.prefix_db.claim_to_txo.get(claim_hash)
if v:
return Prefixes.claim_to_txo.unpack_value(v)
def _get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int: def _get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int:
return sum( return sum(
Prefixes.active_amount.unpack_value(v).amount v.amount for v in self.prefix_db.active_amount.iterate(
for v in self.db.iterator(start=Prefixes.active_amount.pack_partial_key( start=(claim_hash, txo_type, 0), stop=(claim_hash, txo_type, height), include_key=False
claim_hash, txo_type, 0), stop=Prefixes.active_amount.pack_partial_key( )
claim_hash, txo_type, height), include_key=False)
) )
def get_active_amount_as_of_height(self, claim_hash: bytes, height: int) -> int: def get_active_amount_as_of_height(self, claim_hash: bytes, height: int) -> int:
for v in self.db.iterator( for v in self.prefix_db.active_amount.iterate(
start=Prefixes.active_amount.pack_partial_key(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), start=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), stop=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height),
stop=Prefixes.active_amount.pack_partial_key(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height),
include_key=False, reverse=True): include_key=False, reverse=True):
return Prefixes.active_amount.unpack_value(v).amount return v.amount
return 0 return 0
def get_effective_amount(self, claim_hash: bytes, support_only=False) -> int: def get_effective_amount(self, claim_hash: bytes, support_only=False) -> int:
@ -448,14 +445,13 @@ class LevelDB:
return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1)
def get_url_effective_amount(self, name: str, claim_hash: bytes): def get_url_effective_amount(self, name: str, claim_hash: bytes):
for _k, _v in self.db.iterator(prefix=Prefixes.effective_amount.pack_partial_key(name)): for k, v in self.prefix_db.effective_amount.iterate(prefix=(name,)):
v = Prefixes.effective_amount.unpack_value(_v)
if v.claim_hash == claim_hash: if v.claim_hash == claim_hash:
return Prefixes.effective_amount.unpack_key(_k) return k
def get_claims_for_name(self, name): def get_claims_for_name(self, name):
claims = [] claims = []
prefix = Prefixes.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big') prefix = Prefixes.claim_short_id.pack_partial_key(name) + bytes([1])
for _k, _v in self.db.iterator(prefix=prefix): for _k, _v in self.db.iterator(prefix=prefix):
v = Prefixes.claim_short_id.unpack_value(_v) v = Prefixes.claim_short_id.unpack_value(_v)
claim_hash = self.get_claim_from_txo(v.tx_num, v.position).claim_hash claim_hash = self.get_claim_from_txo(v.tx_num, v.position).claim_hash
@ -465,7 +461,7 @@ class LevelDB:
def get_claims_in_channel_count(self, channel_hash) -> int: def get_claims_in_channel_count(self, channel_hash) -> int:
count = 0 count = 0
for _ in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash), include_key=False): for _ in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash,), include_key=False):
count += 1 count += 1
return count return count
@ -853,6 +849,8 @@ class LevelDB:
lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
) )
self.db_op_stack = RevertableOpStack(self.db.get)
self.prefix_db = PrefixDB(self.db, self.db_op_stack)
if is_new: if is_new:
self.logger.info('created new db: %s', f'lbry-leveldb') self.logger.info('created new db: %s', f'lbry-leveldb')