block processor db refactoring

-access db through HubDB class, don't use plyvel.DB directly
-add channel count and support amount prefixes
This commit is contained in:
Jack Robison 2021-10-04 16:38:28 -04:00 committed by Victor Shyba
parent afa710dcb5
commit c2a5ff0ae3
8 changed files with 924 additions and 861 deletions

View file

@ -17,15 +17,11 @@ from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.crypto.hash import hash160
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from lbry.wallet.server.udp import StatusServer
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
from lbry.wallet.server.db.revertable import RevertableOpStack
if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB
@ -153,6 +149,31 @@ class ChainError(Exception):
"""Raised on error processing blocks."""
class StagedClaimtrieItem(typing.NamedTuple):
name: str
normalized_name: str
claim_hash: bytes
amount: int
expiration_height: int
tx_num: int
position: int
root_tx_num: int
root_position: int
channel_signature_is_valid: bool
signing_hash: Optional[bytes]
reposted_claim_hash: Optional[bytes]
@property
def is_update(self) -> bool:
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
def invalidate_signature(self) -> 'StagedClaimtrieItem':
return StagedClaimtrieItem(
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
)
NAMESPACE = "wallet_server"
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
@ -257,6 +278,7 @@ class BlockProcessor:
self.pending_reposted = set()
self.pending_channel_counts = defaultdict(lambda: 0)
self.pending_support_amount_change = defaultdict(lambda: 0)
self.pending_channels = {}
self.amount_cache = {}
@ -266,6 +288,9 @@ class BlockProcessor:
self.claim_channels: Dict[bytes, bytes] = {}
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {}
async def claim_producer(self):
if self.db.db_height <= 1:
return
@ -319,7 +344,7 @@ class BlockProcessor:
self.logger.warning(
"applying extended claim expiration fork on claims accepted by, %i", self.height
)
await self.run_in_thread(self.db.apply_expiration_extension_fork)
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
# TODO: we shouldnt wait on the search index updating before advancing to the next block
if not self.db.first_sync:
await self.db.reload_blocking_filtering_streams()
@ -362,7 +387,6 @@ class BlockProcessor:
assert count > 0, count
for _ in range(count):
await self.backup_block()
await self.flush()
self.logger.info(f'backed up to height {self.height:,d}')
await self.db._read_claim_txos() # TODO: don't do this
@ -392,23 +416,12 @@ class BlockProcessor:
'resetting the prefetcher')
await self.prefetcher.reset_height(self.height)
# - Flushing
def flush_data(self):
"""The data for a flush. The lock must be taken."""
assert self.state_lock.locked()
return FlushData(self.height, self.tx_count, self.db_op_stack, self.tip)
async def flush(self):
def flush():
self.db.flush_dbs(self.flush_data())
self.db.write_db_state()
self.db.prefix_db.commit(self.height)
self.clear_after_advance_or_reorg()
await self.run_in_thread_with_lock(flush)
async def write_state(self):
def flush():
with self.db.db.write_batch(transaction=True) as batch:
self.db.write_db_state(batch)
self.db.assert_db_state()
await self.run_in_thread_with_lock(flush)
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
@ -456,7 +469,11 @@ class BlockProcessor:
signing_channel = self.db.get_claim_txo(signing_channel_hash)
if signing_channel:
raw_channel_tx = self.db.prefix_db.tx.get(self.db.total_transactions[signing_channel.tx_num]).raw_tx
raw_channel_tx = self.db.prefix_db.tx.get(
self.db.prefix_db.tx_hash.get(
signing_channel.tx_num, deserialize_value=False
), deserialize_value=False
)
channel_pub_key_bytes = None
try:
if not signing_channel:
@ -503,11 +520,9 @@ class BlockProcessor:
root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position
activation = self.db.get_activation(prev_tx_num, prev_idx)
claim_name = previous_claim.name
self.db_op_stack.extend_ops(
StagedActivation(
self.get_remove_activate_ops(
ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, normalized_name,
previous_claim.amount
).get_remove_activate_ops()
)
previous_amount = previous_claim.amount
self.updated_claims.add(claim_hash)
@ -523,16 +538,101 @@ class BlockProcessor:
)
self.txo_to_claim[(tx_num, nout)] = pending
self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops())
self.get_add_claim_utxo_ops(pending)
def get_add_claim_utxo_ops(self, pending: StagedClaimtrieItem):
# claim tip by claim hash
self.db.prefix_db.claim_to_txo.stage_put(
(pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position,
pending.amount, pending.channel_signature_is_valid, pending.name)
)
# claim hash by txo
self.db.prefix_db.txo_to_claim.stage_put(
(pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name)
)
# claim expiration
self.db.prefix_db.claim_expiration.stage_put(
(pending.expiration_height, pending.tx_num, pending.position),
(pending.claim_hash, pending.normalized_name)
)
# short url resolution
for prefix_len in range(10):
self.db.prefix_db.claim_short_id.stage_put(
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
pending.root_tx_num, pending.root_position),
(pending.tx_num, pending.position)
)
if pending.signing_hash and pending.channel_signature_is_valid:
# channel by stream
self.db.prefix_db.claim_to_channel.stage_put(
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
)
# stream by channel
self.db.prefix_db.channel_to_claim.stage_put(
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
(pending.claim_hash,)
)
if pending.reposted_claim_hash:
self.db.prefix_db.repost.stage_put((pending.claim_hash,), (pending.reposted_claim_hash,))
self.db.prefix_db.reposted_claim.stage_put(
(pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,)
)
def get_remove_claim_utxo_ops(self, pending: StagedClaimtrieItem):
# claim tip by claim hash
self.db.prefix_db.claim_to_txo.stage_delete(
(pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position,
pending.amount, pending.channel_signature_is_valid, pending.name)
)
# claim hash by txo
self.db.prefix_db.txo_to_claim.stage_delete(
(pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name)
)
# claim expiration
self.db.prefix_db.claim_expiration.stage_delete(
(pending.expiration_height, pending.tx_num, pending.position),
(pending.claim_hash, pending.normalized_name)
)
# short url resolution
for prefix_len in range(10):
self.db.prefix_db.claim_short_id.stage_delete(
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
pending.root_tx_num, pending.root_position),
(pending.tx_num, pending.position)
)
if pending.signing_hash and pending.channel_signature_is_valid:
# channel by stream
self.db.prefix_db.claim_to_channel.stage_delete(
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
)
# stream by channel
self.db.prefix_db.channel_to_claim.stage_delete(
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
(pending.claim_hash,)
)
if pending.reposted_claim_hash:
self.db.prefix_db.repost.stage_delete((pending.claim_hash,), (pending.reposted_claim_hash,))
self.db.prefix_db.reposted_claim.stage_delete(
(pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,)
)
def _add_support(self, height: int, txo: 'Output', tx_num: int, nout: int):
supported_claim_hash = txo.claim_hash[::-1]
self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout))
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount
# print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}")
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
supported_claim_hash, tx_num, nout, txo.amount
).get_add_support_utxo_ops())
self.db.prefix_db.claim_to_support.stage_put((supported_claim_hash, tx_num, nout), (txo.amount,))
self.db.prefix_db.support_to_claim.stage_put((tx_num, nout), (supported_claim_hash,))
self.pending_support_amount_change[supported_claim_hash] += txo.amount
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output',
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]):
@ -542,7 +642,7 @@ class BlockProcessor:
self._add_support(height, txo, tx_num, nout)
def _spend_support_txo(self, height: int, txin: TxInput):
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
txin_num = self.get_pending_tx_num(txin.prev_hash)
activation = 0
if (txin_num, txin.prev_idx) in self.support_txo_to_claim:
spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx))
@ -561,17 +661,17 @@ class BlockProcessor:
if 0 < activation < self.height + 1:
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
if supported_name is not None and activation > 0:
self.db_op_stack.extend_ops(StagedActivation(
self.get_remove_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
support_amount
).get_remove_activate_ops())
)
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops())
self.db.prefix_db.claim_to_support.stage_delete((spent_support, txin_num, txin.prev_idx), (support_amount,))
self.db.prefix_db.support_to_claim.stage_delete((txin_num, txin.prev_idx), (spent_support,))
self.pending_support_amount_change[spent_support] -= support_amount
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool:
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
txin_num = self.get_pending_tx_num(txin.prev_hash)
if (txin_num, txin.prev_idx) in self.txo_to_claim:
spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
else:
@ -593,7 +693,7 @@ class BlockProcessor:
self.pending_channel_counts[spent.signing_hash] -= 1
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name)
# print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops())
self.get_remove_claim_utxo_ops(spent)
return True
def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims):
@ -633,9 +733,31 @@ class BlockProcessor:
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
self._invalidate_channel_signatures(claim_hash)
def _get_invalidate_signature_ops(self, pending: StagedClaimtrieItem):
if not pending.signing_hash:
return
self.db.prefix_db.claim_to_channel.stage_delete(
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
)
if pending.channel_signature_is_valid:
self.db.prefix_db.channel_to_claim.stage_delete(
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
(pending.claim_hash,)
)
self.db.prefix_db.claim_to_txo.stage_delete(
(pending.claim_hash,),
(pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount,
pending.channel_signature_is_valid, pending.name)
)
self.db.prefix_db.claim_to_txo.stage_put(
(pending.claim_hash,),
(pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount,
False, pending.name)
)
def _invalidate_channel_signatures(self, claim_hash: bytes):
for k, signed_claim_hash in self.db.db.iterator(
prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)):
for (signed_claim_hash, ) in self.db.prefix_db.channel_to_claim.iterate(
prefix=(claim_hash, ), include_key=False):
if signed_claim_hash in self.abandoned_claims or signed_claim_hash in self.expired_claim_hashes:
continue
# there is no longer a signing channel for this claim as of this block
@ -657,12 +779,12 @@ class BlockProcessor:
claim = self._make_pending_claim_txo(signed_claim_hash)
self.signatures_changed.add(signed_claim_hash)
self.pending_channel_counts[claim_hash] -= 1
self.db_op_stack.extend_ops(claim.get_invalidate_signature_ops())
self._get_invalidate_signature_ops(claim)
for staged in list(self.txo_to_claim.values()):
needs_invalidate = staged.claim_hash not in self.doesnt_have_valid_signature
if staged.signing_hash == claim_hash and needs_invalidate:
self.db_op_stack.extend_ops(staged.get_invalidate_signature_ops())
self._get_invalidate_signature_ops(staged)
self.txo_to_claim[self.claim_hash_to_txo[staged.claim_hash]] = staged.invalidate_signature()
self.signatures_changed.add(staged.claim_hash)
self.pending_channel_counts[claim_hash] -= 1
@ -758,6 +880,30 @@ class BlockProcessor:
support_amount = self._get_pending_supported_amount(claim_hash, height=height)
return claim_amount + support_amount
def get_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int,
activation_height: int, name: str, amount: int):
self.db.prefix_db.activated.stage_put(
(txo_type, tx_num, position), (activation_height, claim_hash, name)
)
self.db.prefix_db.pending_activation.stage_put(
(activation_height, txo_type, tx_num, position), (claim_hash, name)
)
self.db.prefix_db.active_amount.stage_put(
(claim_hash, txo_type, activation_height, tx_num, position), (amount,)
)
def get_remove_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int,
activation_height: int, name: str, amount: int):
self.db.prefix_db.activated.stage_delete(
(txo_type, tx_num, position), (activation_height, claim_hash, name)
)
self.db.prefix_db.pending_activation.stage_delete(
(activation_height, txo_type, tx_num, position), (claim_hash, name)
)
self.db.prefix_db.active_amount.stage_delete(
(claim_hash, txo_type, activation_height, tx_num, position), (amount,)
)
def _get_takeover_ops(self, height: int):
# cache for controlling claims as of the previous block
@ -779,7 +925,7 @@ class BlockProcessor:
future_activations = defaultdict(dict)
def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int,
amount: int, is_support: bool) -> List['RevertableOp']:
amount: int, is_support: bool):
controlling = get_controlling(name)
nothing_is_controlling = not controlling
staged_is_controlling = False if not controlling else claim_hash == controlling.claim_hash
@ -812,10 +958,10 @@ class BlockProcessor:
))
if is_support:
self.possible_future_support_txos_by_claim_hash[claim_hash].append((tx_num, nout))
return StagedActivation(
self.get_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, claim_hash, tx_num, nout,
height + delay, name, amount
).get_activate_ops()
)
# determine names needing takeover/deletion due to controlling claims being abandoned
# and add ops to deactivate abandoned claims
@ -827,11 +973,9 @@ class BlockProcessor:
activation = self.db.get_activation(staged.tx_num, staged.position)
if activation > 0: # db returns -1 for non-existent txos
# removed queued future activation from the db
self.db_op_stack.extend_ops(
StagedActivation(
self.get_remove_activate_ops(
ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position,
activation, staged.normalized_name, staged.amount
).get_remove_activate_ops()
)
else:
# it hadn't yet been activated
@ -868,10 +1012,10 @@ class BlockProcessor:
prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position)
if height < prev_activation or prev_activation < 0:
is_delayed = True
self.db_op_stack.extend_ops(get_delayed_activate_ops(
get_delayed_activate_ops(
staged.normalized_name, staged.claim_hash, is_delayed, tx_num, nout, staged.amount,
is_support=False
))
)
# and the supports
for (tx_num, nout), (claim_hash, amount) in self.support_txo_to_claim.items():
@ -889,9 +1033,9 @@ class BlockProcessor:
v = supported_claim_info
name = v.normalized_name
staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position)
self.db_op_stack.extend_ops(get_delayed_activate_ops(
get_delayed_activate_ops(
name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True
))
)
# add the activation/delayed-activation ops
for activated, activated_txos in activated_at_height.items():
@ -962,7 +1106,9 @@ class BlockProcessor:
if not has_candidate:
# remove name takeover entry, the name is now unclaimed
controlling = get_controlling(need_takeover)
self.db_op_stack.extend_ops(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
self.db.prefix_db.claim_takeover.stage_delete(
(need_takeover,), (controlling.claim_hash, controlling.height)
)
# scan for possible takeovers out of the accumulated activations, of these make sure there
# aren't any future activations for the taken over names with yet higher amounts, if there are
@ -973,7 +1119,7 @@ class BlockProcessor:
# upon the delayed activation of B, we need to detect to activate C and make it take over early instead
claim_exists = {}
for activated, activated_claim_txo in self.db.get_future_activated(height):
for activated, activated_claim_txo in self.db.get_future_activated(height).items():
# uses the pending effective amount for the future activation height, not the current height
future_amount = self._get_pending_claim_amount(
activated.normalized_name, activated.claim_hash, activated_claim_txo.height + 1
@ -1060,32 +1206,32 @@ class BlockProcessor:
break
assert None not in (amount, activation)
# update the claim that's activating early
self.db_op_stack.extend_ops(
StagedActivation(
self.get_remove_activate_ops(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, activation, name, amount
).get_remove_activate_ops() + \
StagedActivation(
)
self.get_activate_ops(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, height, name, amount
).get_activate_ops()
)
for (k, amount) in activate_in_future[name][winning_including_future_activations]:
txo = (k.tx_num, k.position)
if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]:
self.db_op_stack.extend_ops(
StagedActivation(
self.get_remove_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
k.position, k.height, name, amount
).get_remove_activate_ops() + \
StagedActivation(
)
self.get_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
k.position, height, name, amount
).get_activate_ops()
)
self.taken_over_names.add(name)
self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
if controlling:
self.db.prefix_db.claim_takeover.stage_delete(
(name,), (controlling.claim_hash, controlling.height)
)
self.db.prefix_db.claim_takeover.stage_put((name,), (winning_including_future_activations, height))
self.touched_claim_hashes.add(winning_including_future_activations)
if controlling and controlling.claim_hash not in self.abandoned_claims:
self.touched_claim_hashes.add(controlling.claim_hash)
@ -1106,20 +1252,20 @@ class BlockProcessor:
if previous_pending_activate.height > height:
# the claim had a pending activation in the future, move it to now
if tx_num < self.tx_count:
self.db_op_stack.extend_ops(
StagedActivation(
self.get_remove_activate_ops(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, previous_pending_activate.height, name, amount
).get_remove_activate_ops()
)
self.db_op_stack.extend_ops(
StagedActivation(
self.get_activate_ops(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, height, name, amount
).get_activate_ops()
)
self.taken_over_names.add(name)
self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
if controlling:
self.db.prefix_db.claim_takeover.stage_delete(
(name,), (controlling.claim_hash, controlling.height)
)
self.db.prefix_db.claim_takeover.stage_put((name,), (winning_claim_hash, height))
if controlling and controlling.claim_hash not in self.abandoned_claims:
self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning_claim_hash)
@ -1147,7 +1293,11 @@ class BlockProcessor:
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
self.taken_over_names.add(name)
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning, height, controlling))
if controlling:
self.db.prefix_db.claim_takeover.stage_delete(
(name,), (controlling.claim_hash, controlling.height)
)
self.db.prefix_db.claim_takeover.stage_put((name,), (winning, height))
if controlling:
self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning)
@ -1185,6 +1335,15 @@ class BlockProcessor:
)
)
# update support amount totals
for supported_claim, amount in self.pending_support_amount_change.items():
existing = self.db.prefix_db.support_amount.get(supported_claim)
total = amount
if existing is not None:
total += existing.amount
self.db.prefix_db.support_amount.stage_delete((supported_claim,), existing)
self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,))
# use the cumulative changes to update bid ordered resolve
for removed in self.removed_claim_hashes:
removed_claim = self.db.get_claim_txo(removed)
@ -1193,10 +1352,9 @@ class BlockProcessor:
removed_claim.normalized_name, removed
)
if amt:
self.db_op_stack.extend_ops(get_remove_effective_amount_ops(
removed_claim.normalized_name, amt.effective_amount, amt.tx_num,
amt.position, removed
))
self.db.prefix_db.effective_amount.stage_delete(
(removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,)
)
for touched in self.touched_claim_hashes:
prev_effective_amount = 0
@ -1208,10 +1366,10 @@ class BlockProcessor:
claim_amount_info = self.db.get_url_effective_amount(name, touched)
if claim_amount_info:
prev_effective_amount = claim_amount_info.effective_amount
self.db_op_stack.extend_ops(get_remove_effective_amount_ops(
name, claim_amount_info.effective_amount, claim_amount_info.tx_num,
claim_amount_info.position, touched
))
self.db.prefix_db.effective_amount.stage_delete(
(name, claim_amount_info.effective_amount, claim_amount_info.tx_num,
claim_amount_info.position), (touched,)
)
else:
v = self.db.get_claim_txo(touched)
if not v:
@ -1220,10 +1378,8 @@ class BlockProcessor:
amt = self.db.get_url_effective_amount(name, touched)
if amt:
prev_effective_amount = amt.effective_amount
self.db_op_stack.extend_ops(
get_remove_effective_amount_ops(
name, amt.effective_amount, amt.tx_num, amt.position, touched
)
self.db.prefix_db.effective_amount.stage_delete(
(name, prev_effective_amount, amt.tx_num, amt.position), (touched,)
)
if (name, touched) in self.activated_claim_amount_by_name_and_hash:
@ -1242,12 +1398,18 @@ class BlockProcessor:
touched.hex(), height, False, prev_effective_amount, support_amount
)
new_effective_amount = self._get_pending_effective_amount(name, touched)
self.db_op_stack.extend_ops(
get_add_effective_amount_ops(
name, new_effective_amount, tx_num, position, touched
)
self.db.prefix_db.effective_amount.stage_put(
(name, new_effective_amount, tx_num, position), (touched,)
)
for channel_hash, count in self.pending_channel_counts.items():
if count != 0:
channel_count_val = self.db.prefix_db.channel_count.get(channel_hash)
channel_count = 0 if not channel_count_val else channel_count_val.count
if channel_count_val is not None:
self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,))
self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,))
self.touched_claim_hashes.update(
{k for k in self.pending_reposted if k not in self.removed_claim_hashes}
)
@ -1319,9 +1481,8 @@ class BlockProcessor:
for abandoned_claim_hash, (tx_num, nout, normalized_name) in abandoned_channels.items():
# print(f"\tabandon {normalized_name} {abandoned_claim_hash.hex()} {tx_num} {nout}")
self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name)
self.db.total_transactions.append(tx_hash)
self.db.transaction_num_mapping[tx_hash] = tx_count
self.pending_transactions[tx_count] = tx_hash
self.pending_transaction_num_mapping[tx_hash] = tx_count
tx_count += 1
# handle expired claims
@ -1333,43 +1494,53 @@ class BlockProcessor:
# update effective amount and update sets of touched and deleted claims
self._get_cumulative_update_ops(height)
self.db_op_stack.append_op(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
for hashX, new_history in self.hashXs_by_tx.items():
if not new_history:
continue
self.db_op_stack.append_op(
RevertablePut(
*Prefixes.hashX_history.pack_item(
hashX, height, new_history
)
)
)
self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,))
self.tx_count = tx_count
self.db.tx_counts.append(self.tx_count)
cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit
if height >= cached_max_reorg_depth:
self.db_op_stack.append_op(
RevertablePut(
*Prefixes.touched_or_deleted.pack_item(
height, self.touched_claim_hashes, self.removed_claim_hashes
)
)
)
self.db_op_stack.append_op(
RevertablePut(
*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())
)
self.db.prefix_db.touched_or_deleted.stage_put(
key_args=(height,), value_args=(self.touched_claim_hashes, self.removed_claim_hashes)
)
self.height = height
self.db.headers.append(block.header)
self.tip = self.coin.header_hash(block.header)
min_height = self.db.min_undo_height(self.db.db_height)
if min_height > 0: # delete undos for blocks deep enough they can't be reorged
undo_to_delete = list(self.db.prefix_db.undo.iterate(start=(0,), stop=(min_height,)))
for (k, v) in undo_to_delete:
self.db.prefix_db.undo.stage_delete((k,), (v,))
touched_or_deleted_to_delete = list(self.db.prefix_db.touched_or_deleted.iterate(
start=(0,), stop=(min_height,))
)
for (k, v) in touched_or_deleted_to_delete:
self.db.prefix_db.touched_or_deleted.stage_delete(k, v)
self.db.fs_height = self.height
self.db.fs_tx_count = self.tx_count
self.db.hist_flush_count += 1
self.db.hist_unflushed_count = 0
self.db.utxo_flush_count = self.db.hist_flush_count
self.db.db_height = self.height
self.db.db_tx_count = self.tx_count
self.db.db_tip = self.tip
self.db.last_flush_tx_count = self.db.fs_tx_count
now = time.time()
self.db.wall_time += now - self.db.last_flush
self.db.last_flush = now
self.db.write_db_state()
def clear_after_advance_or_reorg(self):
self.db_op_stack.clear()
self.txo_to_claim.clear()
self.claim_hash_to_txo.clear()
self.support_txos_by_claim.clear()
@ -1399,59 +1570,87 @@ class BlockProcessor:
self.pending_channel_counts.clear()
self.updated_claims.clear()
self.taken_over_names.clear()
self.pending_transaction_num_mapping.clear()
self.pending_transactions.clear()
self.pending_support_amount_change.clear()
async def backup_block(self):
# self.db.assert_flushed(self.flush_data())
self.logger.info("backup block %i", self.height)
# Check and update self.tip
undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height)
if undo_ops is None:
raise ChainError(f'no undo information found for height {self.height:,d}')
self.db_op_stack.append_op(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops))
self.db_op_stack.apply_packed_undo_ops(undo_ops)
touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes)
assert len(self.db.prefix_db._op_stack) == 0
touched_and_deleted = self.db.prefix_db.touched_or_deleted.get(self.height)
self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims)
self.removed_claims_to_send_es.difference_update(touched_and_deleted.touched_claims)
self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims)
# self.db.assert_flushed(self.flush_data())
self.logger.info("backup block %i", self.height)
# Check and update self.tip
self.db.headers.pop()
self.db.tx_counts.pop()
self.tip = self.coin.header_hash(self.db.headers[-1])
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
self.db.transaction_num_mapping.pop(self.db.total_transactions.pop())
self.tx_count -= 1
self.tx_count = self.db.tx_counts[-1]
self.height -= 1
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched_hashXs.discard(None)
assert self.height < self.db.db_height
assert not self.db.hist_unflushed
start_time = time.time()
tx_delta = self.tx_count - self.db.last_flush_tx_count
###
self.db.fs_tx_count = self.tx_count
# Truncate header_mc: header count is 1 more than the height.
self.db.header_mc.truncate(self.height + 1)
###
# Not certain this is needed, but it doesn't hurt
self.db.hist_flush_count += 1
while self.db.fs_height > self.height:
self.db.fs_height -= 1
self.db.utxo_flush_count = self.db.hist_flush_count
self.db.db_height = self.height
self.db.db_tx_count = self.tx_count
self.db.db_tip = self.tip
# Flush state last as it reads the wall time.
now = time.time()
self.db.wall_time += now - self.db.last_flush
self.db.last_flush = now
self.db.last_flush_tx_count = self.db.fs_tx_count
await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1)
self.clear_after_advance_or_reorg()
elapsed = self.db.last_flush - start_time
self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
f'Height {self.height:,d} txs: {self.tx_count:,d} ({tx_delta:+,d})')
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
hashX = self.coin.hashX_from_script(txout.pk_script)
if hashX:
self.touched_hashXs.add(hashX)
self.utxo_cache[(tx_hash, nout)] = (hashX, txout.value)
self.db_op_stack.extend_ops([
RevertablePut(
*Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value)
),
RevertablePut(
*Prefixes.hashX_utxo.pack_item(tx_hash[:4], tx_num, nout, hashX)
)
])
self.db.prefix_db.utxo.stage_put((hashX, tx_num, nout), (txout.value,))
self.db.prefix_db.hashX_utxo.stage_put((tx_hash[:4], tx_num, nout), (hashX,))
return hashX
def get_pending_tx_num(self, tx_hash: bytes) -> int:
if tx_hash in self.pending_transaction_num_mapping:
return self.pending_transaction_num_mapping[tx_hash]
else:
return self.db.prefix_db.tx_num.get(tx_hash).tx_num
def spend_utxo(self, tx_hash: bytes, nout: int):
hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None))
txin_num = self.db.transaction_num_mapping[tx_hash]
hdb_key = Prefixes.hashX_utxo.pack_key(tx_hash[:4], txin_num, nout)
if not hashX:
hashX = self.db.db.get(hdb_key)
txin_num = self.get_pending_tx_num(tx_hash)
if not hashX:
hashX_value = self.db.prefix_db.hashX_utxo.get(tx_hash[:4], txin_num, nout)
if not hashX_value:
return
udb_key = Prefixes.utxo.pack_key(hashX, txin_num, nout)
utxo_value_packed = self.db.db.get(udb_key)
if utxo_value_packed is None:
hashX = hashX_value.hashX
utxo_value = self.db.prefix_db.utxo.get(hashX, txin_num, nout)
if not utxo_value:
self.logger.warning(
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX)
)
@ -1459,18 +1658,13 @@ class BlockProcessor:
f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}"
)
self.touched_hashXs.add(hashX)
self.db_op_stack.extend_ops([
RevertableDelete(hdb_key, hashX),
RevertableDelete(udb_key, utxo_value_packed)
])
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), hashX_value)
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), utxo_value)
return hashX
elif amount is not None:
udb_key = Prefixes.utxo.pack_key(hashX, txin_num, nout)
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), (hashX,))
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), (amount,))
self.touched_hashXs.add(hashX)
self.db_op_stack.extend_ops([
RevertableDelete(hdb_key, hashX),
RevertableDelete(udb_key, Prefixes.utxo.pack_value(amount))
])
return hashX
async def _process_prefetched_blocks(self):
@ -1494,7 +1688,15 @@ class BlockProcessor:
# Flush everything but with first_sync->False state.
first_sync = self.db.first_sync
self.db.first_sync = False
await self.write_state()
def flush():
assert len(self.db.prefix_db._op_stack) == 0
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
self.db.assert_db_state()
await self.run_in_thread_with_lock(flush)
if first_sync:
self.logger.info(f'{lbry.__version__} synced to '
f'height {self.height:,d}, halting here.')
@ -1516,7 +1718,6 @@ class BlockProcessor:
self._caught_up_event = caught_up_event
try:
await self.db.open_dbs()
self.db_op_stack = self.db.db_op_stack
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count

View file

@ -37,4 +37,5 @@ class DB_PREFIXES(enum.Enum):
hashx_utxo = b'h'
hashx_history = b'x'
db_state = b's'
trending_spike = b't'
channel_count = b'Z'
support_amount = b'a'

View file

@ -1,258 +0,0 @@
import typing
from typing import Optional
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
from lbry.wallet.server.db.prefixes import RepostPrefixRow, RepostedPrefixRow
def length_encoded_name(name: str) -> bytes:
encoded = name.encode('utf-8')
return len(encoded).to_bytes(2, byteorder='big') + encoded
class StagedClaimtrieSupport(typing.NamedTuple):
claim_hash: bytes
tx_num: int
position: int
amount: int
def _get_add_remove_support_utxo_ops(self, add=True):
"""
get a list of revertable operations to add or spend a support txo to the key: value database
:param add: if true use RevertablePut operations, otherwise use RevertableDelete
:return:
"""
op = RevertablePut if add else RevertableDelete
return [
op(
*Prefixes.claim_to_support.pack_item(self.claim_hash, self.tx_num, self.position, self.amount)
),
op(
*Prefixes.support_to_claim.pack_item(self.tx_num, self.position, self.claim_hash)
)
]
def get_add_support_utxo_ops(self) -> typing.List[RevertableOp]:
return self._get_add_remove_support_utxo_ops(add=True)
def get_spend_support_txo_ops(self) -> typing.List[RevertableOp]:
return self._get_add_remove_support_utxo_ops(add=False)
class StagedActivation(typing.NamedTuple):
txo_type: int
claim_hash: bytes
tx_num: int
position: int
activation_height: int
name: str
amount: int
def _get_add_remove_activate_ops(self, add=True):
op = RevertablePut if add else RevertableDelete
# print(f"\t{'add' if add else 'remove'} {'claim' if self.txo_type == ACTIVATED_CLAIM_TXO_TYPE else 'support'},"
# f" {self.tx_num}, {self.position}, activation={self.activation_height}, {self.name}, "
# f"amount={self.amount}")
return [
op(
*Prefixes.activated.pack_item(
self.txo_type, self.tx_num, self.position, self.activation_height, self.claim_hash, self.name
)
),
op(
*Prefixes.pending_activation.pack_item(
self.activation_height, self.txo_type, self.tx_num, self.position,
self.claim_hash, self.name
)
),
op(
*Prefixes.active_amount.pack_item(
self.claim_hash, self.txo_type, self.activation_height, self.tx_num, self.position, self.amount
)
)
]
def get_activate_ops(self) -> typing.List[RevertableOp]:
return self._get_add_remove_activate_ops(add=True)
def get_remove_activate_ops(self) -> typing.List[RevertableOp]:
return self._get_add_remove_activate_ops(add=False)
def get_remove_name_ops(name: str, claim_hash: bytes, height: int) -> typing.List[RevertableDelete]:
return [
RevertableDelete(
*Prefixes.claim_takeover.pack_item(
name, claim_hash, height
)
)
]
def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int,
previous_winning: Optional[ClaimTakeoverValue]):
if previous_winning:
return [
RevertableDelete(
*Prefixes.claim_takeover.pack_item(
name, previous_winning.claim_hash, previous_winning.height
)
),
RevertablePut(
*Prefixes.claim_takeover.pack_item(
name, claim_hash, takeover_height
)
)
]
return [
RevertablePut(
*Prefixes.claim_takeover.pack_item(
name, claim_hash, takeover_height
)
)
]
def get_remove_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes):
return [
RevertableDelete(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash))
]
def get_add_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes):
return [
RevertablePut(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash))
]
class StagedClaimtrieItem(typing.NamedTuple):
name: str
normalized_name: str
claim_hash: bytes
amount: int
expiration_height: int
tx_num: int
position: int
root_tx_num: int
root_position: int
channel_signature_is_valid: bool
signing_hash: Optional[bytes]
reposted_claim_hash: Optional[bytes]
@property
def is_update(self) -> bool:
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
def _get_add_remove_claim_utxo_ops(self, add=True):
"""
get a list of revertable operations to add or spend a claim txo to the key: value database
:param add: if true use RevertablePut operations, otherwise use RevertableDelete
:return:
"""
op = RevertablePut if add else RevertableDelete
ops = [
# claim tip by claim hash
op(
*Prefixes.claim_to_txo.pack_item(
self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position,
self.amount, self.channel_signature_is_valid, self.name
)
),
# claim hash by txo
op(
*Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.normalized_name)
),
# claim expiration
op(
*Prefixes.claim_expiration.pack_item(
self.expiration_height, self.tx_num, self.position, self.claim_hash,
self.normalized_name
)
),
# short url resolution
]
ops.extend([
op(
*Prefixes.claim_short_id.pack_item(
self.normalized_name, self.claim_hash.hex()[:prefix_len + 1], self.root_tx_num, self.root_position,
self.tx_num, self.position
)
) for prefix_len in range(10)
])
if self.signing_hash and self.channel_signature_is_valid:
ops.extend([
# channel by stream
op(
*Prefixes.claim_to_channel.pack_item(
self.claim_hash, self.tx_num, self.position, self.signing_hash
)
),
# stream by channel
op(
*Prefixes.channel_to_claim.pack_item(
self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash
)
)
])
if self.reposted_claim_hash:
ops.extend([
op(
*Prefixes.repost.pack_item(self.claim_hash, self.reposted_claim_hash)
),
op(
*Prefixes.reposted_claim.pack_item(
self.reposted_claim_hash, self.tx_num, self.position, self.claim_hash
)
),
])
return ops
def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]:
return self._get_add_remove_claim_utxo_ops(add=True)
def get_spend_claim_txo_ops(self) -> typing.List[RevertableOp]:
return self._get_add_remove_claim_utxo_ops(add=False)
def get_invalidate_signature_ops(self):
if not self.signing_hash:
return []
ops = [
RevertableDelete(
*Prefixes.claim_to_channel.pack_item(
self.claim_hash, self.tx_num, self.position, self.signing_hash
)
)
]
if self.channel_signature_is_valid:
ops.extend([
# delete channel_to_claim/claim_to_channel
RevertableDelete(
*Prefixes.channel_to_claim.pack_item(
self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash
)
),
# update claim_to_txo with channel_signature_is_valid=False
RevertableDelete(
*Prefixes.claim_to_txo.pack_item(
self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position,
self.amount, self.channel_signature_is_valid, self.name
)
),
RevertablePut(
*Prefixes.claim_to_txo.pack_item(
self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position,
self.amount, False, self.name
)
)
])
return ops
def invalidate_signature(self) -> 'StagedClaimtrieItem':
return StagedClaimtrieItem(
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
)

103
lbry/wallet/server/db/db.py Normal file
View file

@ -0,0 +1,103 @@
import struct
from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack
class KeyValueStorage:
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
raise NotImplemented()
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
raise NotImplemented()
def write_batch(self, transaction: bool = False):
raise NotImplemented()
def close(self):
raise NotImplemented()
@property
def closed(self) -> bool:
raise NotImplemented()
class PrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, db: KeyValueStorage, unsafe_prefixes=None):
self._db = db
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
def unsafe_commit(self):
"""
Write staged changes to the database without keeping undo information
Changes written cannot be undone
"""
try:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
finally:
self._op_stack.clear()
def commit(self, height: int):
"""
Write changes for a block height to the database and keep undo information so that the changes can be reverted
"""
undo_ops = self._op_stack.get_undo_ops()
try:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height), undo_ops)
finally:
self._op_stack.clear()
def rollback(self, height: int):
"""
Revert changes for a block height
"""
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
try:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
batch_delete(undo_key)
finally:
self._op_stack.clear()
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self._db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self._db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def close(self):
if not self._db.closed:
self._db.close()
@property
def closed(self):
return self._db.closed

View file

@ -2,9 +2,9 @@ import typing
import struct
import array
import base64
import plyvel
from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name
@ -38,13 +38,13 @@ class PrefixRow(metaclass=PrefixRowType):
value_struct: struct.Struct
key_part_lambdas = []
def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack):
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
self._db = db
self._op_stack = op_stack
def iterate(self, prefix=None, start=None, stop=None,
reverse: bool = False, include_key: bool = True, include_value: bool = True,
fill_cache: bool = True):
fill_cache: bool = True, deserialize_key: bool = True, deserialize_value: bool = True):
if not prefix and not start and not stop:
prefix = ()
if prefix is not None:
@ -54,25 +54,36 @@ class PrefixRow(metaclass=PrefixRowType):
if stop is not None:
stop = self.pack_partial_key(*stop)
if deserialize_key:
key_getter = lambda k: self.unpack_key(k)
else:
key_getter = lambda k: k
if deserialize_value:
value_getter = lambda v: self.unpack_value(v)
else:
value_getter = lambda v: v
if include_key and include_value:
for k, v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse,
fill_cache=fill_cache):
yield self.unpack_key(k), self.unpack_value(v)
yield key_getter(k), value_getter(v)
elif include_key:
for k in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_value=False,
fill_cache=fill_cache):
yield self.unpack_key(k)
yield key_getter(k)
elif include_value:
for v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False,
fill_cache=fill_cache):
yield self.unpack_value(v)
yield value_getter(v)
else:
raise RuntimeError
for _ in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False,
include_value=False, fill_cache=fill_cache):
yield None
def get(self, *key_args, fill_cache=True):
def get(self, *key_args, fill_cache=True, deserialize_value=True):
v = self._db.get(self.pack_key(*key_args), fill_cache=fill_cache)
if v:
return self.unpack_value(v)
return v if not deserialize_value else self.unpack_value(v)
def stage_put(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args)))
@ -303,6 +314,28 @@ class ChannelToClaimValue(typing.NamedTuple):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class ChannelCountKey(typing.NamedTuple):
channel_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(channel_hash={self.channel_hash.hex()})"
class ChannelCountValue(typing.NamedTuple):
count: int
class SupportAmountKey(typing.NamedTuple):
claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class SupportAmountValue(typing.NamedTuple):
amount: int
class ClaimToSupportKey(typing.NamedTuple):
claim_hash: bytes
tx_num: int
@ -469,6 +502,20 @@ class TouchedOrDeletedClaimValue(typing.NamedTuple):
f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})"
class DBState(typing.NamedTuple):
genesis: bytes
height: int
tx_count: int
tip: bytes
utxo_flush_count: int
wall_time: int
first_sync: bool
db_version: int
hist_flush_count: int
comp_flush_count: int
comp_cursor: int
class ActiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.active_amount.value
key_struct = struct.Struct(b'>20sBLLH')
@ -514,9 +561,7 @@ class ClaimToTXOPrefixRow(PrefixRow):
@classmethod
def pack_key(cls, claim_hash: bytes):
return super().pack_key(
claim_hash
)
return super().pack_key(claim_hash)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimToTXOKey:
@ -972,6 +1017,11 @@ class EffectiveAmountPrefixRow(PrefixRow):
class RepostPrefixRow(PrefixRow):
prefix = DB_PREFIXES.repost.value
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, claim_hash: bytes):
return cls.prefix + claim_hash
@ -1031,6 +1081,11 @@ class UndoPrefixRow(PrefixRow):
prefix = DB_PREFIXES.undo.value
key_struct = struct.Struct(b'>Q')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>Q').pack
]
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@ -1059,6 +1114,11 @@ class BlockHashPrefixRow(PrefixRow):
key_struct = struct.Struct(b'>L')
value_struct = struct.Struct(b'>32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int) -> bytes:
return super().pack_key(height)
@ -1085,6 +1145,11 @@ class BlockHeaderPrefixRow(PrefixRow):
key_struct = struct.Struct(b'>L')
value_struct = struct.Struct(b'>112s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int) -> bytes:
return super().pack_key(height)
@ -1111,6 +1176,11 @@ class TXNumPrefixRow(PrefixRow):
key_struct = struct.Struct(b'>32s')
value_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>32s').pack
]
@classmethod
def pack_key(cls, tx_hash: bytes) -> bytes:
return super().pack_key(tx_hash)
@ -1137,6 +1207,11 @@ class TxCountPrefixRow(PrefixRow):
key_struct = struct.Struct(b'>L')
value_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int) -> bytes:
return super().pack_key(height)
@ -1163,6 +1238,11 @@ class TXHashPrefixRow(PrefixRow):
key_struct = struct.Struct(b'>L')
value_struct = struct.Struct(b'>32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, tx_num: int) -> bytes:
return super().pack_key(tx_num)
@ -1188,6 +1268,11 @@ class TXPrefixRow(PrefixRow):
prefix = DB_PREFIXES.tx.value
key_struct = struct.Struct(b'>32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>32s').pack
]
@classmethod
def pack_key(cls, tx_hash: bytes) -> bytes:
return super().pack_key(tx_hash)
@ -1313,6 +1398,10 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_diff.value
key_struct = struct.Struct(b'>L')
value_struct = struct.Struct(b'>LL')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int):
@ -1324,16 +1413,19 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
@classmethod
def pack_value(cls, touched, deleted) -> bytes:
assert True if not touched else all(len(item) == 20 for item in touched)
assert True if not deleted else all(len(item) == 20 for item in deleted)
return cls.value_struct.pack(len(touched), len(deleted)) + b''.join(touched) + b''.join(deleted)
@classmethod
def unpack_value(cls, data: bytes) -> TouchedOrDeletedClaimValue:
touched_len, deleted_len = cls.value_struct.unpack(data[:8])
assert len(data) == 20 * (touched_len + deleted_len) + 8
touched_bytes, deleted_bytes = data[8:touched_len*20+8], data[touched_len*20+8:touched_len*20+deleted_len*20+8]
data = data[8:]
assert len(data) == 20 * (touched_len + deleted_len)
touched_bytes, deleted_bytes = data[:touched_len*20], data[touched_len*20:]
return TouchedOrDeletedClaimValue(
{touched_bytes[8+20*i:8+20*(i+1)] for i in range(touched_len)},
{deleted_bytes[8+20*i:8+20*(i+1)] for i in range(deleted_len)}
{touched_bytes[20*i:20*(i+1)] for i in range(touched_len)},
{deleted_bytes[20*i:20*(i+1)] for i in range(deleted_len)}
)
@classmethod
@ -1341,87 +1433,170 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(touched, deleted)
class Prefixes:
claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow
class ChannelCountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.channel_count.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
claim_to_txo = ClaimToTXOPrefixRow
txo_to_claim = TXOToClaimPrefixRow
@classmethod
def pack_key(cls, channel_hash: int):
return super().pack_key(channel_hash)
claim_to_channel = ClaimToChannelPrefixRow
channel_to_claim = ChannelToClaimPrefixRow
@classmethod
def unpack_key(cls, key: bytes) -> ChannelCountKey:
return ChannelCountKey(*super().unpack_key(key))
claim_short_id = ClaimShortIDPrefixRow
claim_expiration = ClaimExpirationPrefixRow
@classmethod
def pack_value(cls, count: int) -> bytes:
return super().pack_value(count)
claim_takeover = ClaimTakeoverPrefixRow
pending_activation = PendingActivationPrefixRow
activated = ActivatedPrefixRow
active_amount = ActiveAmountPrefixRow
@classmethod
def unpack_value(cls, data: bytes) -> ChannelCountValue:
return ChannelCountValue(*super().unpack_value(data))
effective_amount = EffectiveAmountPrefixRow
repost = RepostPrefixRow
reposted_claim = RepostedPrefixRow
undo = UndoPrefixRow
utxo = UTXOPrefixRow
hashX_utxo = HashXUTXOPrefixRow
hashX_history = HashXHistoryPrefixRow
block_hash = BlockHashPrefixRow
tx_count = TxCountPrefixRow
tx_hash = TXHashPrefixRow
tx_num = TXNumPrefixRow
tx = TXPrefixRow
header = BlockHeaderPrefixRow
touched_or_deleted = TouchedOrDeletedPrefixRow
@classmethod
def pack_item(cls, channel_hash, count):
return cls.pack_key(channel_hash), cls.pack_value(count)
class PrefixDB:
def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack):
self._db = db
self._op_stack = op_stack
class SupportAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.support_amount.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>Q')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
self.claim_to_support = ClaimToSupportPrefixRow(db, op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, op_stack)
self.claim_to_txo = ClaimToTXOPrefixRow(db, op_stack)
self.txo_to_claim = TXOToClaimPrefixRow(db, op_stack)
self.claim_to_channel = ClaimToChannelPrefixRow(db, op_stack)
self.channel_to_claim = ChannelToClaimPrefixRow(db, op_stack)
self.claim_short_id = ClaimShortIDPrefixRow(db, op_stack)
self.claim_expiration = ClaimExpirationPrefixRow(db, op_stack)
self.claim_takeover = ClaimTakeoverPrefixRow(db, op_stack)
self.pending_activation = PendingActivationPrefixRow(db, op_stack)
self.activated = ActivatedPrefixRow(db, op_stack)
self.active_amount = ActiveAmountPrefixRow(db, op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, op_stack)
self.repost = RepostPrefixRow(db, op_stack)
self.reposted_claim = RepostedPrefixRow(db, op_stack)
self.undo = UndoPrefixRow(db, op_stack)
self.utxo = UTXOPrefixRow(db, op_stack)
self.hashX_utxo = HashXUTXOPrefixRow(db, op_stack)
self.hashX_history = HashXHistoryPrefixRow(db, op_stack)
self.block_hash = BlockHashPrefixRow(db, op_stack)
self.tx_count = TxCountPrefixRow(db, op_stack)
self.tx_hash = TXHashPrefixRow(db, op_stack)
self.tx_num = TXNumPrefixRow(db, op_stack)
self.tx = TXPrefixRow(db, op_stack)
self.header = BlockHeaderPrefixRow(db, op_stack)
self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack)
@classmethod
def pack_key(cls, claim_hash: bytes):
return super().pack_key(claim_hash)
def commit(self):
try:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
@classmethod
def unpack_key(cls, key: bytes) -> SupportAmountKey:
return SupportAmountKey(*super().unpack_key(key))
for staged_change in self._op_stack:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
finally:
self._op_stack.clear()
@classmethod
def pack_value(cls, amount: int) -> bytes:
return super().pack_value(amount)
@classmethod
def unpack_value(cls, data: bytes) -> SupportAmountValue:
return SupportAmountValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, claim_hash, amount):
return cls.pack_key(claim_hash), cls.pack_value(amount)
class DBStatePrefixRow(PrefixRow):
prefix = DB_PREFIXES.db_state.value
value_struct = struct.Struct(b'>32sLL32sLLBBlll')
key_struct = struct.Struct(b'')
key_part_lambdas = [
lambda: b''
]
@classmethod
def pack_key(cls) -> bytes:
return cls.prefix
@classmethod
def unpack_key(cls, key: bytes):
return
@classmethod
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int) -> bytes:
return super().pack_value(
genesis, height, tx_count, tip, utxo_flush_count,
wall_time, 1 if first_sync else 0, db_version, hist_flush_count,
comp_flush_count, comp_cursor
)
@classmethod
def unpack_value(cls, data: bytes) -> DBState:
return DBState(*super().unpack_value(data))
@classmethod
def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int):
return cls.pack_key(), cls.pack_value(
genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync, db_version, hist_flush_count,
comp_flush_count, comp_cursor
)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=max_open_files,
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self.db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def write_batch(self, transaction: bool = False, sync: bool = False):
return self.db.write_batch(transaction=transaction, sync=sync)
def close(self):
return self.db.close()
@property
def closed(self) -> bool:
return self.db.closed
class HubDB(PrefixDB):
def __init__(self, path: str, cache_mb: int, max_open_files: int = 512):
db = LevelDBStore(path, cache_mb, max_open_files)
super().__init__(db, unsafe_prefixes={DB_PREFIXES.db_state.value})
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
self.claim_to_txo = ClaimToTXOPrefixRow(db, self._op_stack)
self.txo_to_claim = TXOToClaimPrefixRow(db, self._op_stack)
self.claim_to_channel = ClaimToChannelPrefixRow(db, self._op_stack)
self.channel_to_claim = ChannelToClaimPrefixRow(db, self._op_stack)
self.claim_short_id = ClaimShortIDPrefixRow(db, self._op_stack)
self.claim_expiration = ClaimExpirationPrefixRow(db, self._op_stack)
self.claim_takeover = ClaimTakeoverPrefixRow(db, self._op_stack)
self.pending_activation = PendingActivationPrefixRow(db, self._op_stack)
self.activated = ActivatedPrefixRow(db, self._op_stack)
self.active_amount = ActiveAmountPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.repost = RepostPrefixRow(db, self._op_stack)
self.reposted_claim = RepostedPrefixRow(db, self._op_stack)
self.undo = UndoPrefixRow(db, self._op_stack)
self.utxo = UTXOPrefixRow(db, self._op_stack)
self.hashX_utxo = HashXUTXOPrefixRow(db, self._op_stack)
self.hashX_history = HashXHistoryPrefixRow(db, self._op_stack)
self.block_hash = BlockHashPrefixRow(db, self._op_stack)
self.tx_count = TxCountPrefixRow(db, self._op_stack)
self.tx_hash = TXHashPrefixRow(db, self._op_stack)
self.tx_num = TXNumPrefixRow(db, self._op_stack)
self.tx = TXPrefixRow(db, self._op_stack)
self.header = BlockHeaderPrefixRow(db, self._op_stack)
self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, self._op_stack)
self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
self.db_state = DBStatePrefixRow(db, self._op_stack)
self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -14,10 +14,8 @@ import array
import time
import typing
import struct
import attr
import zlib
import base64
import plyvel
from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List, TYPE_CHECKING
from functools import partial
from asyncio import sleep
@ -32,10 +30,8 @@ from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str
from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.revertable import RevertableOpStack
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
from lbry.wallet.transaction import OutputScript
@ -59,46 +55,8 @@ class UTXO(typing.NamedTuple):
TXO_STRUCT = struct.Struct(b'>LH')
TXO_STRUCT_unpack = TXO_STRUCT.unpack
TXO_STRUCT_pack = TXO_STRUCT.pack
@attr.s(slots=True)
class FlushData:
height = attr.ib()
tx_count = attr.ib()
put_and_delete_ops = attr.ib()
tip = attr.ib()
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, ResolveCensoredError, LookupError, ValueError]]
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sLLBBlll')
DB_STATE_STRUCT_SIZE = 94
class DBState(typing.NamedTuple):
genesis: bytes
height: int
tx_count: int
tip: bytes
utxo_flush_count: int
wall_time: int
first_sync: bool
db_version: int
hist_flush_count: int
comp_flush_count: int
comp_cursor: int
def pack(self) -> bytes:
return DB_STATE_STRUCT.pack(
self.genesis, self.height, self.tx_count, self.tip, self.utxo_flush_count,
self.wall_time, 1 if self.first_sync else 0, self.db_version, self.hist_flush_count,
self.comp_flush_count, self.comp_cursor
)
@classmethod
def unpack(cls, packed: bytes) -> 'DBState':
return cls(*DB_STATE_STRUCT.unpack(packed[:DB_STATE_STRUCT_SIZE]))
class DBError(Exception):
"""Raised on general DB errors generally indicating corruption."""
@ -114,7 +72,6 @@ class LevelDB:
self.logger.info(f'switching current directory to {env.db_dir}')
self.db = None
self.prefix_db = None
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
@ -149,8 +106,6 @@ class LevelDB:
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
self.total_transactions = None
self.transaction_num_mapping = {}
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
@ -173,65 +128,57 @@ class LevelDB:
self.ledger = RegTestLedger
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
claim_hash_and_name = self.prefix_db.txo_to_claim.get(tx_num, tx_idx)
if not claim_hash_and_name:
return
return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name)
return claim_hash_and_name
def get_repost(self, claim_hash) -> Optional[bytes]:
repost = self.db.get(Prefixes.repost.pack_key(claim_hash))
repost = self.prefix_db.repost.get(claim_hash)
if repost:
return Prefixes.repost.unpack_value(repost).reposted_claim_hash
return repost.reposted_claim_hash
return
def get_reposted_count(self, claim_hash: bytes) -> int:
cnt = 0
for _ in self.db.iterator(prefix=Prefixes.reposted_claim.pack_partial_key(claim_hash)):
cnt += 1
return cnt
return sum(
1 for _ in self.prefix_db.reposted_claim.iterate(prefix=(claim_hash,), include_value=False)
)
def get_activation(self, tx_num, position, is_support=False) -> int:
activation = self.db.get(
Prefixes.activated.pack_key(
activation = self.prefix_db.activated.get(
ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position
)
)
if activation:
return Prefixes.activated.unpack_value(activation).height
return activation.height
return -1
def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]:
key = Prefixes.support_to_claim.pack_key(tx_num, position)
supported_claim_hash = self.db.get(key)
supported_claim_hash = self.prefix_db.support_to_claim.get(tx_num, position)
if supported_claim_hash:
packed_support_amount = self.db.get(
Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, position)
packed_support_amount = self.prefix_db.claim_to_support.get(
supported_claim_hash.claim_hash, tx_num, position
)
if packed_support_amount:
return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount
return supported_claim_hash.claim_hash, packed_support_amount.amount
return None, None
def get_support_amount(self, claim_hash: bytes):
total = 0
for packed in self.db.iterator(prefix=Prefixes.claim_to_support.pack_partial_key(claim_hash), include_key=False):
total += Prefixes.claim_to_support.unpack_value(packed).amount
return total
support_amount_val = self.prefix_db.support_amount.get(claim_hash)
if support_amount_val is None:
return 0
return support_amount_val.amount
def get_supports(self, claim_hash: bytes):
supports = []
for k, v in self.db.iterator(prefix=Prefixes.claim_to_support.pack_partial_key(claim_hash)):
unpacked_k = Prefixes.claim_to_support.unpack_key(k)
unpacked_v = Prefixes.claim_to_support.unpack_value(v)
supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount))
return supports
return [
(k.tx_num, k.position, v.amount) for k, v in self.prefix_db.claim_to_support.iterate(prefix=(claim_hash,))
]
def get_short_claim_id_url(self, name: str, normalized_name: str, claim_hash: bytes,
root_tx_num: int, root_position: int) -> str:
claim_id = claim_hash.hex()
for prefix_len in range(10):
prefix = Prefixes.claim_short_id.pack_partial_key(normalized_name, claim_id[:prefix_len+1])
for _k in self.db.iterator(prefix=prefix, include_value=False):
k = Prefixes.claim_short_id.unpack_key(_k)
for k in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:prefix_len+1]),
include_value=False):
if k.root_tx_num == root_tx_num and k.root_position == root_position:
return f'{name}#{k.partial_claim_id}'
break
@ -247,7 +194,7 @@ class LevelDB:
normalized_name = name
controlling_claim = self.get_controlling_claim(normalized_name)
tx_hash = self.total_transactions[tx_num]
tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
height = bisect_right(self.tx_counts, tx_num)
created_height = bisect_right(self.tx_counts, root_tx_num)
last_take_over_height = controlling_claim.height
@ -314,10 +261,7 @@ class LevelDB:
self.get_activation(claim_txo.tx_num, claim_txo.position), claim_txo.channel_signature_is_valid
)
# resolve by partial/complete claim id
prefix = Prefixes.claim_short_id.pack_partial_key(normalized_name, claim_id[:10])
for k, v in self.db.iterator(prefix=prefix):
key = Prefixes.claim_short_id.unpack_key(k)
claim_txo = Prefixes.claim_short_id.unpack_value(v)
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])):
claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position]
non_normalized_name = self.claim_to_txo.get(claim_hash).name
signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid
@ -329,12 +273,9 @@ class LevelDB:
return
# resolve by amount ordering, 1 indexed
prefix = Prefixes.effective_amount.pack_partial_key(normalized_name)
for idx, (k, v) in enumerate(self.db.iterator(prefix=prefix)):
for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))):
if amount_order > idx + 1:
continue
key = Prefixes.effective_amount.unpack_key(k)
claim_val = Prefixes.effective_amount.unpack_value(v)
claim_txo = self.claim_to_txo.get(claim_val.claim_hash)
activation = self.get_activation(key.tx_num, key.position)
return self._prepare_resolve_result(
@ -345,9 +286,7 @@ class LevelDB:
def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str):
candidates = []
for k, v in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash, normalized_name)):
key = Prefixes.channel_to_claim.unpack_key(k)
stream = Prefixes.channel_to_claim.unpack_value(v)
for key, stream in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash, normalized_name)):
effective_amount = self.get_effective_amount(stream.claim_hash)
if not candidates or candidates[-1][-1] == effective_amount:
candidates.append((stream.claim_hash, key.tx_num, key.position, effective_amount))
@ -362,7 +301,7 @@ class LevelDB:
try:
parsed = URL.parse(url)
except ValueError as e:
return e, None
return e, None, None
stream = channel = resolved_channel = resolved_stream = None
if parsed.has_stream_in_channel:
@ -428,9 +367,9 @@ class LevelDB:
return claim.amount
def get_block_hash(self, height: int) -> Optional[bytes]:
v = self.db.get(Prefixes.block_hash.pack_key(height))
v = self.prefix_db.block_hash.get(height)
if v:
return Prefixes.block_hash.unpack_value(v).block_hash
return v.block_hash
def get_support_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]:
v = self.prefix_db.claim_to_support.get(claim_hash, tx_num, position)
@ -467,19 +406,19 @@ class LevelDB:
def get_claims_for_name(self, name):
claims = []
prefix = Prefixes.claim_short_id.pack_partial_key(name) + bytes([1])
for _k, _v in self.db.iterator(prefix=prefix):
v = Prefixes.claim_short_id.unpack_value(_v)
prefix = self.prefix_db.claim_short_id.pack_partial_key(name) + bytes([1])
for _k, _v in self.prefix_db.iterator(prefix=prefix):
v = self.prefix_db.claim_short_id.unpack_value(_v)
claim_hash = self.get_claim_from_txo(v.tx_num, v.position).claim_hash
if claim_hash not in claims:
claims.append(claim_hash)
return claims
def get_claims_in_channel_count(self, channel_hash) -> int:
count = 0
for _ in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash,), include_key=False):
count += 1
return count
channel_count_val = self.prefix_db.channel_count.get(channel_hash)
if channel_count_val is None:
return 0
return channel_count_val.count
async def reload_blocking_filtering_streams(self):
def reload():
@ -506,14 +445,15 @@ class LevelDB:
return streams, channels
def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]:
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position))
v = self.prefix_db.claim_to_channel.get(claim_hash, tx_num, position)
if v:
return v.signing_hash
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
expired = {}
for _k, _v in self.db.iterator(prefix=Prefixes.claim_expiration.pack_partial_key(height)):
k, v = Prefixes.claim_expiration.unpack_item(_k, _v)
tx_hash = self.total_transactions[k.tx_num]
tx = self.coin.transaction(self.db.get(Prefixes.tx.pack_key(tx_hash)))
for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)):
tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False)
tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False))
# treat it like a claim spend so it will delete/abandon properly
# the _spend_claim function this result is fed to expects a txi, so make a mock one
# print(f"\texpired lbry://{v.name} {v.claim_hash.hex()}")
@ -524,21 +464,21 @@ class LevelDB:
return expired
def get_controlling_claim(self, name: str) -> Optional[ClaimTakeoverValue]:
controlling = self.db.get(Prefixes.claim_takeover.pack_key(name))
controlling = self.prefix_db.claim_takeover.get(name)
if not controlling:
return
return Prefixes.claim_takeover.unpack_value(controlling)
return controlling
def get_claim_txos_for_name(self, name: str):
txos = {}
prefix = Prefixes.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big')
for k, v in self.db.iterator(prefix=prefix):
tx_num, nout = Prefixes.claim_short_id.unpack_value(v)
prefix = self.prefix_db.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big')
for k, v in self.prefix_db.iterator(prefix=prefix):
tx_num, nout = self.prefix_db.claim_short_id.unpack_value(v)
txos[self.get_claim_from_txo(tx_num, nout).claim_hash] = tx_num, nout
return txos
def get_claim_metadata(self, tx_hash, nout):
raw = self.db.get(Prefixes.tx.pack_key(tx_hash))
raw = self.prefix_db.tx.get(tx_hash, deserialize_value=False)
try:
output = self.coin.transaction(raw).outputs[nout]
script = OutputScript(output.pk_script)
@ -547,7 +487,7 @@ class LevelDB:
except:
self.logger.error(
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
raw.hex()
(raw or b'').hex()
)
return
@ -577,7 +517,7 @@ class LevelDB:
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.total_transactions[reposted_claim.tx_num], reposted_claim.position
self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position
)
if not reposted_metadata:
return
@ -591,8 +531,8 @@ class LevelDB:
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
reposted_tx_hash = self.total_transactions[reposted_claim.tx_num]
raw_reposted_claim_tx = self.db.get(Prefixes.tx.pack_key(reposted_tx_hash))
reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False)
raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False)
try:
reposted_claim_txo = self.coin.transaction(
raw_reposted_claim_tx
@ -727,7 +667,7 @@ class LevelDB:
batch = []
for claim_hash, claim_txo in self.claim_to_txo.items():
# TODO: fix the couple of claim txos that dont have controlling names
if not self.db.get(Prefixes.claim_takeover.pack_key(claim_txo.normalized_name)):
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue
claim = self._fs_get_claim_by_hash(claim_hash)
if claim:
@ -793,23 +733,17 @@ class LevelDB:
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list)
for _k, _v in self.db.iterator(prefix=Prefixes.pending_activation.pack_partial_key(height)):
k = Prefixes.pending_activation.unpack_key(_k)
v = Prefixes.pending_activation.unpack_value(_v)
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):
activated[v].append(k)
return activated
def get_future_activated(self, height: int) -> typing.Generator[
Tuple[PendingActivationValue, PendingActivationKey], None, None]:
yielded = set()
start_prefix = Prefixes.pending_activation.pack_partial_key(height + 1)
stop_prefix = Prefixes.pending_activation.pack_partial_key(height + 1 + self.coin.maxTakeoverDelay)
for _k, _v in self.db.iterator(start=start_prefix, stop=stop_prefix, reverse=True):
if _v not in yielded:
yielded.add(_v)
v = Prefixes.pending_activation.unpack_value(_v)
k = Prefixes.pending_activation.unpack_key(_k)
yield v, k
def get_future_activated(self, height: int) -> typing.Dict[PendingActivationValue, PendingActivationKey]:
results = {}
for k, v in self.prefix_db.pending_activation.iterate(
start=(height + 1,), stop=(height + 1 + self.coin.maxTakeoverDelay,), reverse=True):
if v not in results:
results[v] = k
return results
async def _read_tx_counts(self):
if self.tx_counts is not None:
@ -818,11 +752,9 @@ class LevelDB:
# height N. So tx_counts[0] is 1 - the genesis coinbase
def get_counts():
return tuple(
Prefixes.tx_count.unpack_value(packed_tx_count).tx_count
for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False,
fill_cache=False)
)
return [
v.tx_count for v in self.prefix_db.tx_count.iterate(include_key=False, fill_cache=False)
]
tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts)
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
@ -834,20 +766,6 @@ class LevelDB:
else:
assert self.db_tx_count == 0
async def _read_txids(self):
def get_txids():
return list(self.db.iterator(prefix=Prefixes.tx_hash.prefix, include_key=False, fill_cache=False))
start = time.perf_counter()
self.logger.info("loading txids")
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
self.total_transactions = txids
self.transaction_num_mapping = {
txid: i for i, txid in enumerate(txids)
}
ts = time.perf_counter() - start
self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4))
async def _read_claim_txos(self):
def read_claim_txos():
@ -870,8 +788,9 @@ class LevelDB:
def get_headers():
return [
header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False,
fill_cache=False)
header for header in self.prefix_db.header.iterate(
include_key=False, fill_cache=False, deserialize_value=False
)
]
headers = await asyncio.get_event_loop().run_in_executor(None, get_headers)
@ -884,23 +803,13 @@ class LevelDB:
return int(160.6855883050695 * height)
async def open_dbs(self):
if self.db:
if self.prefix_db and not self.prefix_db.closed:
return
path = os.path.join(self.env.db_dir, 'lbry-leveldb')
is_new = os.path.isdir(path)
self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=512,
lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
self.prefix_db = HubDB(
os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.cache_MB, max_open_files=512
)
self.db_op_stack = RevertableOpStack(self.db.get, unsafe_prefixes={DB_PREFIXES.trending_spike.value})
self.prefix_db = PrefixDB(self.db, self.db_op_stack)
if is_new:
self.logger.info('created new db: %s', f'lbry-leveldb')
else:
self.logger.info(f'opened db: %s', f'lbry-leveldb')
self.logger.info(f'opened db: lbry-leveldb')
# read db state
self.read_db_state()
@ -929,8 +838,6 @@ class LevelDB:
# Read TX counts (requires meta directory)
await self._read_tx_counts()
if self.total_transactions is None:
await self._read_txids()
await self._read_headers()
await self._read_claim_txos()
@ -938,7 +845,7 @@ class LevelDB:
await self.search_index.start()
def close(self):
self.db.close()
self.prefix_db.close()
# Header merkle cache
@ -953,113 +860,6 @@ class LevelDB:
async def header_branch_and_root(self, length, height):
return await self.header_mc.branch_and_root(length, height)
# Flushing
def assert_flushed(self, flush_data):
"""Asserts state is fully flushed."""
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
assert flush_data.height == self.fs_height == self.db_height
assert flush_data.tip == self.db_tip
assert not len(flush_data.put_and_delete_ops)
def flush_dbs(self, flush_data: FlushData):
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
min_height = self.min_undo_height(self.db_height)
delete_undo_keys = []
if min_height > 0: # delete undos for blocks deep enough they can't be reorged
delete_undo_keys.extend(
self.db.iterator(
start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False
)
)
delete_undo_keys.extend(
self.db.iterator(
start=Prefixes.touched_or_deleted.pack_key(0),
stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False
)
)
with self.db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in flush_data.put_and_delete_ops:
if staged_change.is_put:
batch_put(staged_change.key, staged_change.value)
else:
batch_delete(staged_change.key)
for delete_key in delete_undo_keys:
batch_delete(delete_key)
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
self.hist_flush_count += 1
self.hist_unflushed_count = 0
self.utxo_flush_count = self.hist_flush_count
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
self.last_flush_tx_count = self.fs_tx_count
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.write_db_state(batch)
def flush_backup(self, flush_data):
assert flush_data.height < self.db_height
assert not self.hist_unflushed
start_time = time.time()
tx_delta = flush_data.tx_count - self.last_flush_tx_count
###
self.fs_tx_count = flush_data.tx_count
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(flush_data.height + 1)
###
# Not certain this is needed, but it doesn't hurt
self.hist_flush_count += 1
nremoves = 0
with self.db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for op in flush_data.put_and_delete_ops:
# print("REWIND", op)
if op.is_put:
batch_put(op.key, op.value)
else:
batch_delete(op.key)
while self.fs_height > flush_data.height:
self.fs_height -= 1
start_time = time.time()
if self.db.for_sync:
block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time
self.logger.info(f'flushed {block_count:,d} blocks with '
f'{tx_count:,d} txs in '
f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.hist_flush_count
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
# Flush state last as it reads the wall time.
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_db_state(batch)
self.logger.info(f'backing up removed {nremoves:,d} history entries')
elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. '
f'Height {flush_data.height:,d} txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def raw_header(self, height):
"""Return the binary header at the given height."""
header, n = self.read_headers(height, 1)
@ -1103,7 +903,7 @@ class LevelDB:
if tx_height > self.db_height:
return None, tx_height
try:
return self.total_transactions[tx_num], tx_height
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height
except IndexError:
self.logger.exception(
"Failed to access a cached transaction, known bug #3142 "
@ -1111,9 +911,17 @@ class LevelDB:
)
return None, tx_height
def get_block_txs(self, height: int) -> List[bytes]:
return [
tx_hash for tx_hash in self.prefix_db.tx_hash.iterate(
start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],),
deserialize_value=False, include_key=False
)
]
def _fs_transactions(self, txids: Iterable[str]):
tx_counts = self.tx_counts
tx_db_get = self.db.get
tx_db_get = self.prefix_db.tx.get
tx_cache = self._tx_and_merkle_cache
tx_infos = {}
@ -1123,13 +931,14 @@ class LevelDB:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.transaction_num_mapping.get(tx_hash_bytes)
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(tx_counts, tx_num)
tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes), fill_cache=fill_cache)
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
@ -1137,7 +946,7 @@ class LevelDB:
else:
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.total_transactions[tx_counts[tx_height - 1]:tx_counts[tx_height]], tx_pos
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
@ -1160,15 +969,17 @@ class LevelDB:
raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]:
txs = array.array('I')
for hist in self.db.iterator(prefix=Prefixes.hashX_history.pack_partial_key(hashX), include_key=False):
a = array.array('I')
a.frombytes(hist)
txs.extend(a)
def read_history(self, hashX: bytes, limit: int = 1000) -> List[Tuple[bytes, int]]:
txs = []
txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
txs_extend([
(self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num))
for tx_num in hist
])
if len(txs) >= limit:
break
return txs.tolist()
return txs
async def limited_history(self, hashX, *, limit=1000):
"""Return an unpruned, sorted list of (tx_hash, height) tuples of
@ -1177,13 +988,7 @@ class LevelDB:
transactions. By default returns at most 1000 entries. Set
limit to None to get them all.
"""
while True:
history = await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit)
if history is not None:
return [(self.total_transactions[tx_num], bisect_right(self.tx_counts, tx_num)) for tx_num in history]
self.logger.warning(f'limited_history: tx hash '
f'not found (reorg?), retrying...')
await sleep(0.25)
return await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit)
# -- Undo information
@ -1191,45 +996,33 @@ class LevelDB:
"""Returns a height from which we should store undo info."""
return max_height - self.env.reorg_limit + 1
def undo_key(self, height: int) -> bytes:
"""DB key for undo information at the given height."""
return Prefixes.undo.pack_key(height)
def read_undo_info(self, height: int):
return self.db.get(Prefixes.undo.pack_key(height)), self.db.get(Prefixes.touched_or_deleted.pack_key(height))
return self.prefix_db.undo.get(height), self.prefix_db.touched_or_deleted.get(height)
def apply_expiration_extension_fork(self):
# TODO: this can't be reorged
deletes = []
adds = []
for k, v in self.db.iterator(prefix=Prefixes.claim_expiration.prefix):
old_key = Prefixes.claim_expiration.unpack_key(k)
new_key = Prefixes.claim_expiration.pack_key(
bisect_right(self.tx_counts, old_key.tx_num) + self.coin.nExtendedClaimExpirationTime,
old_key.tx_num, old_key.position
for k, v in self.prefix_db.claim_expiration.iterate():
self.prefix_db.claim_expiration.stage_delete(k, v)
self.prefix_db.claim_expiration.stage_put(
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
k.tx_num, k.position), v
)
deletes.append(k)
adds.append((new_key, v))
with self.db.write_batch(transaction=True) as batch:
for k in deletes:
batch.delete(k)
for k, v in adds:
batch.put(k, v)
self.prefix_db.unsafe_commit()
def write_db_state(self, batch):
def write_db_state(self):
"""Write (UTXO) state to the batch."""
batch.put(
DB_PREFIXES.db_state.value,
DBState(
if self.db_height > 0:
self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get())
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor
).pack()
)
)
def read_db_state(self):
state = self.db.get(DB_PREFIXES.db_state.value)
state = self.prefix_db.db_state.get()
if not state:
self.db_height = -1
self.db_tx_count = 0
@ -1243,7 +1036,6 @@ class LevelDB:
self.hist_comp_cursor = -1
self.hist_db_version = max(self.DB_VERSIONS)
else:
state = DBState.unpack(state)
self.db_version = state.db_version
if self.db_version not in self.DB_VERSIONS:
raise DBError(f'your DB version is {self.db_version} but this '
@ -1264,15 +1056,21 @@ class LevelDB:
self.hist_comp_cursor = state.comp_cursor
self.hist_db_version = state.db_version
def assert_db_state(self):
state = self.prefix_db.db_state.get()
assert self.db_version == state.db_version
assert self.db_height == state.height
assert self.db_tx_count == state.tx_count
assert self.db_tip == state.tip
assert self.first_sync == state.first_sync
async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order."""
def read_utxos():
utxos = []
utxos_append = utxos.append
fs_tx_hash = self.fs_tx_hash
for db_key, db_value in self.db.iterator(prefix=Prefixes.utxo.pack_partial_key(hashX)):
k = Prefixes.utxo.unpack_key(db_key)
v = Prefixes.utxo.unpack_value(db_value)
for k, v in self.prefix_db.utxo.iterate(prefix=(hashX, )):
tx_hash, height = fs_tx_hash(k.tx_num)
utxos_append(UTXO(k.tx_num, k.nout, tx_hash, height, v.amount))
return utxos
@ -1290,14 +1088,16 @@ class LevelDB:
utxos = []
utxo_append = utxos.append
for (tx_hash, nout) in prevouts:
if tx_hash not in self.transaction_num_mapping:
tx_num_val = self.prefix_db.tx_num.get(tx_hash)
if not tx_num_val:
continue
tx_num = self.transaction_num_mapping[tx_hash]
hashX = self.db.get(Prefixes.hashX_utxo.pack_key(tx_hash[:4], tx_num, nout))
if not hashX:
tx_num = tx_num_val.tx_num
hashX_val = self.prefix_db.hashX_utxo.get(tx_hash[:4], tx_num, nout)
if not hashX_val:
continue
utxo_value = self.db.get(Prefixes.utxo.pack_key(hashX, tx_num, nout))
hashX = hashX_val.hashX
utxo_value = self.prefix_db.utxo.get(hashX, tx_num, nout)
if utxo_value:
utxo_append((hashX, Prefixes.utxo.unpack_value(utxo_value).amount))
utxo_append((hashX, utxo_value.amount))
return utxos
return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos)

View file

@ -115,7 +115,7 @@ class BaseResolveTestCase(CommandTestCase):
def check_supports(claim_id, lbrycrd_supports):
for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))):
support = lbrycrd_supports[i]
self.assertEqual(support['txId'], db.total_transactions[tx_num][::-1].hex())
self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex())
self.assertEqual(support['n'], position)
self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num))
self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True))
@ -127,7 +127,7 @@ class BaseResolveTestCase(CommandTestCase):
check_supports(c['claimId'], c['supports'])
claim_hash = bytes.fromhex(c['claimId'])
self.assertEqual(c['validAtHeight'], db.get_activation(
db.transaction_num_mapping[bytes.fromhex(c['txId'])[::-1]], c['n']
db.prefix_db.tx_num.get(bytes.fromhex(c['txId'])[::-1]).tx_num, c['n']
))
self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash))
@ -1451,19 +1451,13 @@ class ResolveAfterReorg(BaseResolveTestCase):
async def assertBlockHash(self, height):
bp = self.conductor.spv_node.server.bp
def get_txids():
return [
bp.db.fs_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
]
block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txids = [
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
]
txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')

View file

@ -1,6 +1,8 @@
import unittest
import tempfile
import shutil
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertableDelete, RevertablePut, OpStackIntegrity
from lbry.wallet.server.db.prefixes import Prefixes
from lbry.wallet.server.db.prefixes import ClaimToTXOPrefixRow, HubDB
class TestRevertableOpStack(unittest.TestCase):
@ -25,14 +27,14 @@ class TestRevertableOpStack(unittest.TestCase):
self.stack.append_op(RevertablePut(key2, value2))
def test_simplify(self):
key1 = Prefixes.claim_to_txo.pack_key(b'\x01' * 20)
key2 = Prefixes.claim_to_txo.pack_key(b'\x02' * 20)
key3 = Prefixes.claim_to_txo.pack_key(b'\x03' * 20)
key4 = Prefixes.claim_to_txo.pack_key(b'\x04' * 20)
key1 = ClaimToTXOPrefixRow.pack_key(b'\x01' * 20)
key2 = ClaimToTXOPrefixRow.pack_key(b'\x02' * 20)
key3 = ClaimToTXOPrefixRow.pack_key(b'\x03' * 20)
key4 = ClaimToTXOPrefixRow.pack_key(b'\x04' * 20)
val1 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'derp')
val2 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'oops')
val3 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'other')
val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'derp')
val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'oops')
val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'other')
# check that we can't delete a non existent value
with self.assertRaises(OpStackIntegrity):
@ -101,3 +103,48 @@ class TestRevertableOpStack(unittest.TestCase):
self.process_stack()
self.assertDictEqual({key2: val3}, self.fake_db)
class TestRevertablePrefixDB(unittest.TestCase):
def setUp(self):
self.tmp_dir = tempfile.mkdtemp()
self.db = HubDB(self.tmp_dir, cache_mb=1, max_open_files=32)
def tearDown(self) -> None:
self.db.close()
shutil.rmtree(self.tmp_dir)
def test_rollback(self):
name = 'derp'
claim_hash1 = 20 * b'\x00'
claim_hash2 = 20 * b'\x01'
claim_hash3 = 20 * b'\x02'
takeover_height = 10000000
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height))
self.db.commit(10000000)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1))
self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1))
self.db.commit(10000001)
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2))
self.db.commit(10000002)
self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3))
self.db.commit(10000003)
self.assertEqual(10000003, self.db.claim_takeover.get(name).height)
self.db.rollback(10000003)
self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
self.db.rollback(10000002)
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.rollback(10000001)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
self.db.rollback(10000000)
self.assertIsNone(self.db.claim_takeover.get(name))