diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 893eb540c..36f1d5614 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -17,15 +17,11 @@ from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.crypto.hash import hash160 -from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.mempool import MemPool -from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport -from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops -from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE -from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue +from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from lbry.wallet.server.udp import StatusServer -from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack +from lbry.wallet.server.db.revertable import RevertableOpStack if typing.TYPE_CHECKING: from lbry.wallet.server.leveldb import LevelDB @@ -153,6 +149,31 @@ class ChainError(Exception): """Raised on error processing blocks.""" +class StagedClaimtrieItem(typing.NamedTuple): + name: str + normalized_name: str + claim_hash: bytes + amount: int + expiration_height: int + tx_num: int + position: int + root_tx_num: int + root_position: int + channel_signature_is_valid: bool + signing_hash: Optional[bytes] + reposted_claim_hash: Optional[bytes] + + @property + def is_update(self) -> bool: + return (self.tx_num, self.position) != (self.root_tx_num, self.root_position) + + def invalidate_signature(self) -> 'StagedClaimtrieItem': + return StagedClaimtrieItem( + self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, + self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash + ) + + NAMESPACE = "wallet_server" HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') @@ -257,6 +278,7 @@ class BlockProcessor: self.pending_reposted = set() self.pending_channel_counts = defaultdict(lambda: 0) + self.pending_support_amount_change = defaultdict(lambda: 0) self.pending_channels = {} self.amount_cache = {} @@ -266,6 +288,9 @@ class BlockProcessor: self.claim_channels: Dict[bytes, bytes] = {} self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list) + self.pending_transaction_num_mapping: Dict[bytes, int] = {} + self.pending_transactions: Dict[int, bytes] = {} + async def claim_producer(self): if self.db.db_height <= 1: return @@ -319,7 +344,7 @@ class BlockProcessor: self.logger.warning( "applying extended claim expiration fork on claims accepted by, %i", self.height ) - await self.run_in_thread(self.db.apply_expiration_extension_fork) + await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork) # TODO: we shouldnt wait on the search index updating before advancing to the next block if not self.db.first_sync: await self.db.reload_blocking_filtering_streams() @@ -362,7 +387,6 @@ class BlockProcessor: assert count > 0, count for _ in range(count): await self.backup_block() - await self.flush() self.logger.info(f'backed up to height {self.height:,d}') await self.db._read_claim_txos() # TODO: don't do this @@ -392,23 +416,12 @@ class BlockProcessor: 'resetting the prefetcher') await self.prefetcher.reset_height(self.height) - # - Flushing - def flush_data(self): - """The data for a flush. The lock must be taken.""" - assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.db_op_stack, self.tip) - async def flush(self): def flush(): - self.db.flush_dbs(self.flush_data()) + self.db.write_db_state() + self.db.prefix_db.commit(self.height) self.clear_after_advance_or_reorg() - await self.run_in_thread_with_lock(flush) - - async def write_state(self): - def flush(): - with self.db.db.write_batch(transaction=True) as batch: - self.db.write_db_state(batch) - + self.db.assert_db_state() await self.run_in_thread_with_lock(flush) def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, @@ -456,7 +469,11 @@ class BlockProcessor: signing_channel = self.db.get_claim_txo(signing_channel_hash) if signing_channel: - raw_channel_tx = self.db.prefix_db.tx.get(self.db.total_transactions[signing_channel.tx_num]).raw_tx + raw_channel_tx = self.db.prefix_db.tx.get( + self.db.prefix_db.tx_hash.get( + signing_channel.tx_num, deserialize_value=False + ), deserialize_value=False + ) channel_pub_key_bytes = None try: if not signing_channel: @@ -503,11 +520,9 @@ class BlockProcessor: root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position activation = self.db.get_activation(prev_tx_num, prev_idx) claim_name = previous_claim.name - self.db_op_stack.extend_ops( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, normalized_name, - previous_claim.amount - ).get_remove_activate_ops() + self.get_remove_activate_ops( + ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, normalized_name, + previous_claim.amount ) previous_amount = previous_claim.amount self.updated_claims.add(claim_hash) @@ -523,16 +538,101 @@ class BlockProcessor: ) self.txo_to_claim[(tx_num, nout)] = pending self.claim_hash_to_txo[claim_hash] = (tx_num, nout) - self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops()) + self.get_add_claim_utxo_ops(pending) + + def get_add_claim_utxo_ops(self, pending: StagedClaimtrieItem): + # claim tip by claim hash + self.db.prefix_db.claim_to_txo.stage_put( + (pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, + pending.amount, pending.channel_signature_is_valid, pending.name) + ) + # claim hash by txo + self.db.prefix_db.txo_to_claim.stage_put( + (pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name) + ) + + # claim expiration + self.db.prefix_db.claim_expiration.stage_put( + (pending.expiration_height, pending.tx_num, pending.position), + (pending.claim_hash, pending.normalized_name) + ) + + # short url resolution + for prefix_len in range(10): + self.db.prefix_db.claim_short_id.stage_put( + (pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1], + pending.root_tx_num, pending.root_position), + (pending.tx_num, pending.position) + ) + + if pending.signing_hash and pending.channel_signature_is_valid: + # channel by stream + self.db.prefix_db.claim_to_channel.stage_put( + (pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,) + ) + # stream by channel + self.db.prefix_db.channel_to_claim.stage_put( + (pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position), + (pending.claim_hash,) + ) + + if pending.reposted_claim_hash: + self.db.prefix_db.repost.stage_put((pending.claim_hash,), (pending.reposted_claim_hash,)) + self.db.prefix_db.reposted_claim.stage_put( + (pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,) + ) + + def get_remove_claim_utxo_ops(self, pending: StagedClaimtrieItem): + # claim tip by claim hash + self.db.prefix_db.claim_to_txo.stage_delete( + (pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, + pending.amount, pending.channel_signature_is_valid, pending.name) + ) + # claim hash by txo + self.db.prefix_db.txo_to_claim.stage_delete( + (pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name) + ) + + # claim expiration + self.db.prefix_db.claim_expiration.stage_delete( + (pending.expiration_height, pending.tx_num, pending.position), + (pending.claim_hash, pending.normalized_name) + ) + + # short url resolution + for prefix_len in range(10): + self.db.prefix_db.claim_short_id.stage_delete( + (pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1], + pending.root_tx_num, pending.root_position), + (pending.tx_num, pending.position) + ) + + if pending.signing_hash and pending.channel_signature_is_valid: + # channel by stream + self.db.prefix_db.claim_to_channel.stage_delete( + (pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,) + ) + # stream by channel + self.db.prefix_db.channel_to_claim.stage_delete( + (pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position), + (pending.claim_hash,) + ) + + if pending.reposted_claim_hash: + self.db.prefix_db.repost.stage_delete((pending.claim_hash,), (pending.reposted_claim_hash,)) + self.db.prefix_db.reposted_claim.stage_delete( + (pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,) + ) def _add_support(self, height: int, txo: 'Output', tx_num: int, nout: int): supported_claim_hash = txo.claim_hash[::-1] self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout)) self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount # print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}") - self.db_op_stack.extend_ops(StagedClaimtrieSupport( - supported_claim_hash, tx_num, nout, txo.amount - ).get_add_support_utxo_ops()) + + self.db.prefix_db.claim_to_support.stage_put((supported_claim_hash, tx_num, nout), (txo.amount,)) + self.db.prefix_db.support_to_claim.stage_put((tx_num, nout), (supported_claim_hash,)) + self.pending_support_amount_change[supported_claim_hash] += txo.amount def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output', spent_claims: typing.Dict[bytes, Tuple[int, int, str]]): @@ -542,7 +642,7 @@ class BlockProcessor: self._add_support(height, txo, tx_num, nout) def _spend_support_txo(self, height: int, txin: TxInput): - txin_num = self.db.transaction_num_mapping[txin.prev_hash] + txin_num = self.get_pending_tx_num(txin.prev_hash) activation = 0 if (txin_num, txin.prev_idx) in self.support_txo_to_claim: spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx)) @@ -561,17 +661,17 @@ class BlockProcessor: if 0 < activation < self.height + 1: self.removed_active_support_amount_by_claim[spent_support].append(support_amount) if supported_name is not None and activation > 0: - self.db_op_stack.extend_ops(StagedActivation( + self.get_remove_activate_ops( ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, support_amount - ).get_remove_activate_ops()) + ) # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}") - self.db_op_stack.extend_ops(StagedClaimtrieSupport( - spent_support, txin_num, txin.prev_idx, support_amount - ).get_spend_support_txo_ops()) + self.db.prefix_db.claim_to_support.stage_delete((spent_support, txin_num, txin.prev_idx), (support_amount,)) + self.db.prefix_db.support_to_claim.stage_delete((txin_num, txin.prev_idx), (spent_support,)) + self.pending_support_amount_change[spent_support] -= support_amount def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool: - txin_num = self.db.transaction_num_mapping[txin.prev_hash] + txin_num = self.get_pending_tx_num(txin.prev_hash) if (txin_num, txin.prev_idx) in self.txo_to_claim: spent = self.txo_to_claim[(txin_num, txin.prev_idx)] else: @@ -593,7 +693,7 @@ class BlockProcessor: self.pending_channel_counts[spent.signing_hash] -= 1 spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name) # print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") - self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops()) + self.get_remove_claim_utxo_ops(spent) return True def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims): @@ -633,9 +733,31 @@ class BlockProcessor: if normalized_name.startswith('@'): # abandon a channel, invalidate signatures self._invalidate_channel_signatures(claim_hash) + def _get_invalidate_signature_ops(self, pending: StagedClaimtrieItem): + if not pending.signing_hash: + return + self.db.prefix_db.claim_to_channel.stage_delete( + (pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,) + ) + if pending.channel_signature_is_valid: + self.db.prefix_db.channel_to_claim.stage_delete( + (pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position), + (pending.claim_hash,) + ) + self.db.prefix_db.claim_to_txo.stage_delete( + (pending.claim_hash,), + (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount, + pending.channel_signature_is_valid, pending.name) + ) + self.db.prefix_db.claim_to_txo.stage_put( + (pending.claim_hash,), + (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount, + False, pending.name) + ) + def _invalidate_channel_signatures(self, claim_hash: bytes): - for k, signed_claim_hash in self.db.db.iterator( - prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)): + for (signed_claim_hash, ) in self.db.prefix_db.channel_to_claim.iterate( + prefix=(claim_hash, ), include_key=False): if signed_claim_hash in self.abandoned_claims or signed_claim_hash in self.expired_claim_hashes: continue # there is no longer a signing channel for this claim as of this block @@ -657,12 +779,12 @@ class BlockProcessor: claim = self._make_pending_claim_txo(signed_claim_hash) self.signatures_changed.add(signed_claim_hash) self.pending_channel_counts[claim_hash] -= 1 - self.db_op_stack.extend_ops(claim.get_invalidate_signature_ops()) + self._get_invalidate_signature_ops(claim) for staged in list(self.txo_to_claim.values()): needs_invalidate = staged.claim_hash not in self.doesnt_have_valid_signature if staged.signing_hash == claim_hash and needs_invalidate: - self.db_op_stack.extend_ops(staged.get_invalidate_signature_ops()) + self._get_invalidate_signature_ops(staged) self.txo_to_claim[self.claim_hash_to_txo[staged.claim_hash]] = staged.invalidate_signature() self.signatures_changed.add(staged.claim_hash) self.pending_channel_counts[claim_hash] -= 1 @@ -758,6 +880,30 @@ class BlockProcessor: support_amount = self._get_pending_supported_amount(claim_hash, height=height) return claim_amount + support_amount + def get_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int, + activation_height: int, name: str, amount: int): + self.db.prefix_db.activated.stage_put( + (txo_type, tx_num, position), (activation_height, claim_hash, name) + ) + self.db.prefix_db.pending_activation.stage_put( + (activation_height, txo_type, tx_num, position), (claim_hash, name) + ) + self.db.prefix_db.active_amount.stage_put( + (claim_hash, txo_type, activation_height, tx_num, position), (amount,) + ) + + def get_remove_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int, + activation_height: int, name: str, amount: int): + self.db.prefix_db.activated.stage_delete( + (txo_type, tx_num, position), (activation_height, claim_hash, name) + ) + self.db.prefix_db.pending_activation.stage_delete( + (activation_height, txo_type, tx_num, position), (claim_hash, name) + ) + self.db.prefix_db.active_amount.stage_delete( + (claim_hash, txo_type, activation_height, tx_num, position), (amount,) + ) + def _get_takeover_ops(self, height: int): # cache for controlling claims as of the previous block @@ -779,7 +925,7 @@ class BlockProcessor: future_activations = defaultdict(dict) def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int, - amount: int, is_support: bool) -> List['RevertableOp']: + amount: int, is_support: bool): controlling = get_controlling(name) nothing_is_controlling = not controlling staged_is_controlling = False if not controlling else claim_hash == controlling.claim_hash @@ -812,10 +958,10 @@ class BlockProcessor: )) if is_support: self.possible_future_support_txos_by_claim_hash[claim_hash].append((tx_num, nout)) - return StagedActivation( + self.get_activate_ops( ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, claim_hash, tx_num, nout, height + delay, name, amount - ).get_activate_ops() + ) # determine names needing takeover/deletion due to controlling claims being abandoned # and add ops to deactivate abandoned claims @@ -827,11 +973,9 @@ class BlockProcessor: activation = self.db.get_activation(staged.tx_num, staged.position) if activation > 0: # db returns -1 for non-existent txos # removed queued future activation from the db - self.db_op_stack.extend_ops( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, - activation, staged.normalized_name, staged.amount - ).get_remove_activate_ops() + self.get_remove_activate_ops( + ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, + activation, staged.normalized_name, staged.amount ) else: # it hadn't yet been activated @@ -868,10 +1012,10 @@ class BlockProcessor: prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position) if height < prev_activation or prev_activation < 0: is_delayed = True - self.db_op_stack.extend_ops(get_delayed_activate_ops( + get_delayed_activate_ops( staged.normalized_name, staged.claim_hash, is_delayed, tx_num, nout, staged.amount, is_support=False - )) + ) # and the supports for (tx_num, nout), (claim_hash, amount) in self.support_txo_to_claim.items(): @@ -889,9 +1033,9 @@ class BlockProcessor: v = supported_claim_info name = v.normalized_name staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position) - self.db_op_stack.extend_ops(get_delayed_activate_ops( + get_delayed_activate_ops( name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True - )) + ) # add the activation/delayed-activation ops for activated, activated_txos in activated_at_height.items(): @@ -962,7 +1106,9 @@ class BlockProcessor: if not has_candidate: # remove name takeover entry, the name is now unclaimed controlling = get_controlling(need_takeover) - self.db_op_stack.extend_ops(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) + self.db.prefix_db.claim_takeover.stage_delete( + (need_takeover,), (controlling.claim_hash, controlling.height) + ) # scan for possible takeovers out of the accumulated activations, of these make sure there # aren't any future activations for the taken over names with yet higher amounts, if there are @@ -973,7 +1119,7 @@ class BlockProcessor: # upon the delayed activation of B, we need to detect to activate C and make it take over early instead claim_exists = {} - for activated, activated_claim_txo in self.db.get_future_activated(height): + for activated, activated_claim_txo in self.db.get_future_activated(height).items(): # uses the pending effective amount for the future activation height, not the current height future_amount = self._get_pending_claim_amount( activated.normalized_name, activated.claim_hash, activated_claim_txo.height + 1 @@ -1060,32 +1206,32 @@ class BlockProcessor: break assert None not in (amount, activation) # update the claim that's activating early - self.db_op_stack.extend_ops( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, - position, activation, name, amount - ).get_remove_activate_ops() + \ - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, - position, height, name, amount - ).get_activate_ops() + self.get_remove_activate_ops( + ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, + position, activation, name, amount + ) + self.get_activate_ops( + ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, + position, height, name, amount ) for (k, amount) in activate_in_future[name][winning_including_future_activations]: txo = (k.tx_num, k.position) if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]: - self.db_op_stack.extend_ops( - StagedActivation( - ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, - k.position, k.height, name, amount - ).get_remove_activate_ops() + \ - StagedActivation( - ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, - k.position, height, name, amount - ).get_activate_ops() + self.get_remove_activate_ops( + ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, + k.position, k.height, name, amount + ) + self.get_activate_ops( + ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, + k.position, height, name, amount ) self.taken_over_names.add(name) - self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) + if controlling: + self.db.prefix_db.claim_takeover.stage_delete( + (name,), (controlling.claim_hash, controlling.height) + ) + self.db.prefix_db.claim_takeover.stage_put((name,), (winning_including_future_activations, height)) self.touched_claim_hashes.add(winning_including_future_activations) if controlling and controlling.claim_hash not in self.abandoned_claims: self.touched_claim_hashes.add(controlling.claim_hash) @@ -1106,20 +1252,20 @@ class BlockProcessor: if previous_pending_activate.height > height: # the claim had a pending activation in the future, move it to now if tx_num < self.tx_count: - self.db_op_stack.extend_ops( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, - position, previous_pending_activate.height, name, amount - ).get_remove_activate_ops() - ) - self.db_op_stack.extend_ops( - StagedActivation( + self.get_remove_activate_ops( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, - position, height, name, amount - ).get_activate_ops() + position, previous_pending_activate.height, name, amount + ) + self.get_activate_ops( + ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, + position, height, name, amount ) self.taken_over_names.add(name) - self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) + if controlling: + self.db.prefix_db.claim_takeover.stage_delete( + (name,), (controlling.claim_hash, controlling.height) + ) + self.db.prefix_db.claim_takeover.stage_put((name,), (winning_claim_hash, height)) if controlling and controlling.claim_hash not in self.abandoned_claims: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning_claim_hash) @@ -1147,7 +1293,11 @@ class BlockProcessor: if (controlling and winning != controlling.claim_hash) or (not controlling and winning): self.taken_over_names.add(name) # print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") - self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning, height, controlling)) + if controlling: + self.db.prefix_db.claim_takeover.stage_delete( + (name,), (controlling.claim_hash, controlling.height) + ) + self.db.prefix_db.claim_takeover.stage_put((name,), (winning, height)) if controlling: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning) @@ -1185,6 +1335,15 @@ class BlockProcessor: ) ) + # update support amount totals + for supported_claim, amount in self.pending_support_amount_change.items(): + existing = self.db.prefix_db.support_amount.get(supported_claim) + total = amount + if existing is not None: + total += existing.amount + self.db.prefix_db.support_amount.stage_delete((supported_claim,), existing) + self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,)) + # use the cumulative changes to update bid ordered resolve for removed in self.removed_claim_hashes: removed_claim = self.db.get_claim_txo(removed) @@ -1193,10 +1352,9 @@ class BlockProcessor: removed_claim.normalized_name, removed ) if amt: - self.db_op_stack.extend_ops(get_remove_effective_amount_ops( - removed_claim.normalized_name, amt.effective_amount, amt.tx_num, - amt.position, removed - )) + self.db.prefix_db.effective_amount.stage_delete( + (removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,) + ) for touched in self.touched_claim_hashes: prev_effective_amount = 0 @@ -1208,10 +1366,10 @@ class BlockProcessor: claim_amount_info = self.db.get_url_effective_amount(name, touched) if claim_amount_info: prev_effective_amount = claim_amount_info.effective_amount - self.db_op_stack.extend_ops(get_remove_effective_amount_ops( - name, claim_amount_info.effective_amount, claim_amount_info.tx_num, - claim_amount_info.position, touched - )) + self.db.prefix_db.effective_amount.stage_delete( + (name, claim_amount_info.effective_amount, claim_amount_info.tx_num, + claim_amount_info.position), (touched,) + ) else: v = self.db.get_claim_txo(touched) if not v: @@ -1220,10 +1378,8 @@ class BlockProcessor: amt = self.db.get_url_effective_amount(name, touched) if amt: prev_effective_amount = amt.effective_amount - self.db_op_stack.extend_ops( - get_remove_effective_amount_ops( - name, amt.effective_amount, amt.tx_num, amt.position, touched - ) + self.db.prefix_db.effective_amount.stage_delete( + (name, prev_effective_amount, amt.tx_num, amt.position), (touched,) ) if (name, touched) in self.activated_claim_amount_by_name_and_hash: @@ -1242,12 +1398,18 @@ class BlockProcessor: touched.hex(), height, False, prev_effective_amount, support_amount ) new_effective_amount = self._get_pending_effective_amount(name, touched) - self.db_op_stack.extend_ops( - get_add_effective_amount_ops( - name, new_effective_amount, tx_num, position, touched - ) + self.db.prefix_db.effective_amount.stage_put( + (name, new_effective_amount, tx_num, position), (touched,) ) + for channel_hash, count in self.pending_channel_counts.items(): + if count != 0: + channel_count_val = self.db.prefix_db.channel_count.get(channel_hash) + channel_count = 0 if not channel_count_val else channel_count_val.count + if channel_count_val is not None: + self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,)) + self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,)) + self.touched_claim_hashes.update( {k for k in self.pending_reposted if k not in self.removed_claim_hashes} ) @@ -1319,9 +1481,8 @@ class BlockProcessor: for abandoned_claim_hash, (tx_num, nout, normalized_name) in abandoned_channels.items(): # print(f"\tabandon {normalized_name} {abandoned_claim_hash.hex()} {tx_num} {nout}") self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) - - self.db.total_transactions.append(tx_hash) - self.db.transaction_num_mapping[tx_hash] = tx_count + self.pending_transactions[tx_count] = tx_hash + self.pending_transaction_num_mapping[tx_hash] = tx_count tx_count += 1 # handle expired claims @@ -1333,43 +1494,53 @@ class BlockProcessor: # update effective amount and update sets of touched and deleted claims self._get_cumulative_update_ops(height) - self.db_op_stack.append_op(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count))) + self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) for hashX, new_history in self.hashXs_by_tx.items(): if not new_history: continue - self.db_op_stack.append_op( - RevertablePut( - *Prefixes.hashX_history.pack_item( - hashX, height, new_history - ) - ) - ) + self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,)) self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit if height >= cached_max_reorg_depth: - self.db_op_stack.append_op( - RevertablePut( - *Prefixes.touched_or_deleted.pack_item( - height, self.touched_claim_hashes, self.removed_claim_hashes - ) - ) - ) - self.db_op_stack.append_op( - RevertablePut( - *Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops()) - ) + self.db.prefix_db.touched_or_deleted.stage_put( + key_args=(height,), value_args=(self.touched_claim_hashes, self.removed_claim_hashes) ) self.height = height self.db.headers.append(block.header) self.tip = self.coin.header_hash(block.header) + min_height = self.db.min_undo_height(self.db.db_height) + if min_height > 0: # delete undos for blocks deep enough they can't be reorged + undo_to_delete = list(self.db.prefix_db.undo.iterate(start=(0,), stop=(min_height,))) + for (k, v) in undo_to_delete: + self.db.prefix_db.undo.stage_delete((k,), (v,)) + touched_or_deleted_to_delete = list(self.db.prefix_db.touched_or_deleted.iterate( + start=(0,), stop=(min_height,)) + ) + for (k, v) in touched_or_deleted_to_delete: + self.db.prefix_db.touched_or_deleted.stage_delete(k, v) + + self.db.fs_height = self.height + self.db.fs_tx_count = self.tx_count + self.db.hist_flush_count += 1 + self.db.hist_unflushed_count = 0 + self.db.utxo_flush_count = self.db.hist_flush_count + self.db.db_height = self.height + self.db.db_tx_count = self.tx_count + self.db.db_tip = self.tip + self.db.last_flush_tx_count = self.db.fs_tx_count + now = time.time() + self.db.wall_time += now - self.db.last_flush + self.db.last_flush = now + + self.db.write_db_state() + def clear_after_advance_or_reorg(self): - self.db_op_stack.clear() self.txo_to_claim.clear() self.claim_hash_to_txo.clear() self.support_txos_by_claim.clear() @@ -1399,59 +1570,87 @@ class BlockProcessor: self.pending_channel_counts.clear() self.updated_claims.clear() self.taken_over_names.clear() + self.pending_transaction_num_mapping.clear() + self.pending_transactions.clear() + self.pending_support_amount_change.clear() async def backup_block(self): - # self.db.assert_flushed(self.flush_data()) - self.logger.info("backup block %i", self.height) - # Check and update self.tip - undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height) - if undo_ops is None: - raise ChainError(f'no undo information found for height {self.height:,d}') - self.db_op_stack.append_op(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops)) - self.db_op_stack.apply_packed_undo_ops(undo_ops) - - touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes) + assert len(self.db.prefix_db._op_stack) == 0 + touched_and_deleted = self.db.prefix_db.touched_or_deleted.get(self.height) self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims) self.removed_claims_to_send_es.difference_update(touched_and_deleted.touched_claims) self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims) + # self.db.assert_flushed(self.flush_data()) + self.logger.info("backup block %i", self.height) + # Check and update self.tip + self.db.headers.pop() self.db.tx_counts.pop() self.tip = self.coin.header_hash(self.db.headers[-1]) - while len(self.db.total_transactions) > self.db.tx_counts[-1]: - self.db.transaction_num_mapping.pop(self.db.total_transactions.pop()) - self.tx_count -= 1 + self.tx_count = self.db.tx_counts[-1] self.height -= 1 # self.touched can include other addresses which is # harmless, but remove None. self.touched_hashXs.discard(None) + assert self.height < self.db.db_height + assert not self.db.hist_unflushed + + start_time = time.time() + tx_delta = self.tx_count - self.db.last_flush_tx_count + ### + self.db.fs_tx_count = self.tx_count + # Truncate header_mc: header count is 1 more than the height. + self.db.header_mc.truncate(self.height + 1) + ### + # Not certain this is needed, but it doesn't hurt + self.db.hist_flush_count += 1 + + while self.db.fs_height > self.height: + self.db.fs_height -= 1 + self.db.utxo_flush_count = self.db.hist_flush_count + self.db.db_height = self.height + self.db.db_tx_count = self.tx_count + self.db.db_tip = self.tip + # Flush state last as it reads the wall time. + now = time.time() + self.db.wall_time += now - self.db.last_flush + self.db.last_flush = now + self.db.last_flush_tx_count = self.db.fs_tx_count + + await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1) + self.clear_after_advance_or_reorg() + + elapsed = self.db.last_flush - start_time + self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. ' + f'Height {self.height:,d} txs: {self.tx_count:,d} ({tx_delta:+,d})') + def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]: hashX = self.coin.hashX_from_script(txout.pk_script) if hashX: self.touched_hashXs.add(hashX) self.utxo_cache[(tx_hash, nout)] = (hashX, txout.value) - self.db_op_stack.extend_ops([ - RevertablePut( - *Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value) - ), - RevertablePut( - *Prefixes.hashX_utxo.pack_item(tx_hash[:4], tx_num, nout, hashX) - ) - ]) + self.db.prefix_db.utxo.stage_put((hashX, tx_num, nout), (txout.value,)) + self.db.prefix_db.hashX_utxo.stage_put((tx_hash[:4], tx_num, nout), (hashX,)) return hashX + def get_pending_tx_num(self, tx_hash: bytes) -> int: + if tx_hash in self.pending_transaction_num_mapping: + return self.pending_transaction_num_mapping[tx_hash] + else: + return self.db.prefix_db.tx_num.get(tx_hash).tx_num + def spend_utxo(self, tx_hash: bytes, nout: int): hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None)) - txin_num = self.db.transaction_num_mapping[tx_hash] - hdb_key = Prefixes.hashX_utxo.pack_key(tx_hash[:4], txin_num, nout) + txin_num = self.get_pending_tx_num(tx_hash) if not hashX: - hashX = self.db.db.get(hdb_key) - if not hashX: + hashX_value = self.db.prefix_db.hashX_utxo.get(tx_hash[:4], txin_num, nout) + if not hashX_value: return - udb_key = Prefixes.utxo.pack_key(hashX, txin_num, nout) - utxo_value_packed = self.db.db.get(udb_key) - if utxo_value_packed is None: + hashX = hashX_value.hashX + utxo_value = self.db.prefix_db.utxo.get(hashX, txin_num, nout) + if not utxo_value: self.logger.warning( "%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX) ) @@ -1459,18 +1658,13 @@ class BlockProcessor: f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}" ) self.touched_hashXs.add(hashX) - self.db_op_stack.extend_ops([ - RevertableDelete(hdb_key, hashX), - RevertableDelete(udb_key, utxo_value_packed) - ]) + self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), hashX_value) + self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), utxo_value) return hashX elif amount is not None: - udb_key = Prefixes.utxo.pack_key(hashX, txin_num, nout) + self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), (hashX,)) + self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), (amount,)) self.touched_hashXs.add(hashX) - self.db_op_stack.extend_ops([ - RevertableDelete(hdb_key, hashX), - RevertableDelete(udb_key, Prefixes.utxo.pack_value(amount)) - ]) return hashX async def _process_prefetched_blocks(self): @@ -1494,7 +1688,15 @@ class BlockProcessor: # Flush everything but with first_sync->False state. first_sync = self.db.first_sync self.db.first_sync = False - await self.write_state() + + def flush(): + assert len(self.db.prefix_db._op_stack) == 0 + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.db.assert_db_state() + + await self.run_in_thread_with_lock(flush) + if first_sync: self.logger.info(f'{lbry.__version__} synced to ' f'height {self.height:,d}, halting here.') @@ -1516,7 +1718,6 @@ class BlockProcessor: self._caught_up_event = caught_up_event try: await self.db.open_dbs() - self.db_op_stack = self.db.db_op_stack self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index f2c40697b..b3201dc79 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -37,4 +37,5 @@ class DB_PREFIXES(enum.Enum): hashx_utxo = b'h' hashx_history = b'x' db_state = b's' - trending_spike = b't' + channel_count = b'Z' + support_amount = b'a' diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py deleted file mode 100644 index 54a65484d..000000000 --- a/lbry/wallet/server/db/claimtrie.py +++ /dev/null @@ -1,258 +0,0 @@ -import typing -from typing import Optional -from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp -from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow -from lbry.wallet.server.db.prefixes import RepostPrefixRow, RepostedPrefixRow - - -def length_encoded_name(name: str) -> bytes: - encoded = name.encode('utf-8') - return len(encoded).to_bytes(2, byteorder='big') + encoded - - -class StagedClaimtrieSupport(typing.NamedTuple): - claim_hash: bytes - tx_num: int - position: int - amount: int - - def _get_add_remove_support_utxo_ops(self, add=True): - """ - get a list of revertable operations to add or spend a support txo to the key: value database - - :param add: if true use RevertablePut operations, otherwise use RevertableDelete - :return: - """ - op = RevertablePut if add else RevertableDelete - return [ - op( - *Prefixes.claim_to_support.pack_item(self.claim_hash, self.tx_num, self.position, self.amount) - ), - op( - *Prefixes.support_to_claim.pack_item(self.tx_num, self.position, self.claim_hash) - ) - ] - - def get_add_support_utxo_ops(self) -> typing.List[RevertableOp]: - return self._get_add_remove_support_utxo_ops(add=True) - - def get_spend_support_txo_ops(self) -> typing.List[RevertableOp]: - return self._get_add_remove_support_utxo_ops(add=False) - - -class StagedActivation(typing.NamedTuple): - txo_type: int - claim_hash: bytes - tx_num: int - position: int - activation_height: int - name: str - amount: int - - def _get_add_remove_activate_ops(self, add=True): - op = RevertablePut if add else RevertableDelete - # print(f"\t{'add' if add else 'remove'} {'claim' if self.txo_type == ACTIVATED_CLAIM_TXO_TYPE else 'support'}," - # f" {self.tx_num}, {self.position}, activation={self.activation_height}, {self.name}, " - # f"amount={self.amount}") - return [ - op( - *Prefixes.activated.pack_item( - self.txo_type, self.tx_num, self.position, self.activation_height, self.claim_hash, self.name - ) - ), - op( - *Prefixes.pending_activation.pack_item( - self.activation_height, self.txo_type, self.tx_num, self.position, - self.claim_hash, self.name - ) - ), - op( - *Prefixes.active_amount.pack_item( - self.claim_hash, self.txo_type, self.activation_height, self.tx_num, self.position, self.amount - ) - ) - ] - - def get_activate_ops(self) -> typing.List[RevertableOp]: - return self._get_add_remove_activate_ops(add=True) - - def get_remove_activate_ops(self) -> typing.List[RevertableOp]: - return self._get_add_remove_activate_ops(add=False) - - -def get_remove_name_ops(name: str, claim_hash: bytes, height: int) -> typing.List[RevertableDelete]: - return [ - RevertableDelete( - *Prefixes.claim_takeover.pack_item( - name, claim_hash, height - ) - ) - ] - - -def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int, - previous_winning: Optional[ClaimTakeoverValue]): - if previous_winning: - return [ - RevertableDelete( - *Prefixes.claim_takeover.pack_item( - name, previous_winning.claim_hash, previous_winning.height - ) - ), - RevertablePut( - *Prefixes.claim_takeover.pack_item( - name, claim_hash, takeover_height - ) - ) - ] - return [ - RevertablePut( - *Prefixes.claim_takeover.pack_item( - name, claim_hash, takeover_height - ) - ) - ] - - -def get_remove_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes): - return [ - RevertableDelete(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash)) - ] - - -def get_add_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes): - return [ - RevertablePut(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash)) - ] - - -class StagedClaimtrieItem(typing.NamedTuple): - name: str - normalized_name: str - claim_hash: bytes - amount: int - expiration_height: int - tx_num: int - position: int - root_tx_num: int - root_position: int - channel_signature_is_valid: bool - signing_hash: Optional[bytes] - reposted_claim_hash: Optional[bytes] - - @property - def is_update(self) -> bool: - return (self.tx_num, self.position) != (self.root_tx_num, self.root_position) - - def _get_add_remove_claim_utxo_ops(self, add=True): - """ - get a list of revertable operations to add or spend a claim txo to the key: value database - - :param add: if true use RevertablePut operations, otherwise use RevertableDelete - :return: - """ - op = RevertablePut if add else RevertableDelete - ops = [ - # claim tip by claim hash - op( - *Prefixes.claim_to_txo.pack_item( - self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position, - self.amount, self.channel_signature_is_valid, self.name - ) - ), - # claim hash by txo - op( - *Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.normalized_name) - ), - # claim expiration - op( - *Prefixes.claim_expiration.pack_item( - self.expiration_height, self.tx_num, self.position, self.claim_hash, - self.normalized_name - ) - ), - # short url resolution - ] - ops.extend([ - op( - *Prefixes.claim_short_id.pack_item( - self.normalized_name, self.claim_hash.hex()[:prefix_len + 1], self.root_tx_num, self.root_position, - self.tx_num, self.position - ) - ) for prefix_len in range(10) - ]) - - if self.signing_hash and self.channel_signature_is_valid: - ops.extend([ - # channel by stream - op( - *Prefixes.claim_to_channel.pack_item( - self.claim_hash, self.tx_num, self.position, self.signing_hash - ) - ), - # stream by channel - op( - *Prefixes.channel_to_claim.pack_item( - self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash - ) - ) - ]) - if self.reposted_claim_hash: - ops.extend([ - op( - *Prefixes.repost.pack_item(self.claim_hash, self.reposted_claim_hash) - ), - op( - *Prefixes.reposted_claim.pack_item( - self.reposted_claim_hash, self.tx_num, self.position, self.claim_hash - ) - ), - - ]) - return ops - - def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]: - return self._get_add_remove_claim_utxo_ops(add=True) - - def get_spend_claim_txo_ops(self) -> typing.List[RevertableOp]: - return self._get_add_remove_claim_utxo_ops(add=False) - - def get_invalidate_signature_ops(self): - if not self.signing_hash: - return [] - ops = [ - RevertableDelete( - *Prefixes.claim_to_channel.pack_item( - self.claim_hash, self.tx_num, self.position, self.signing_hash - ) - ) - ] - if self.channel_signature_is_valid: - ops.extend([ - # delete channel_to_claim/claim_to_channel - RevertableDelete( - *Prefixes.channel_to_claim.pack_item( - self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash - ) - ), - # update claim_to_txo with channel_signature_is_valid=False - RevertableDelete( - *Prefixes.claim_to_txo.pack_item( - self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position, - self.amount, self.channel_signature_is_valid, self.name - ) - ), - RevertablePut( - *Prefixes.claim_to_txo.pack_item( - self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position, - self.amount, False, self.name - ) - ) - ]) - return ops - - def invalidate_signature(self) -> 'StagedClaimtrieItem': - return StagedClaimtrieItem( - self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, - self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash - ) diff --git a/lbry/wallet/server/db/db.py b/lbry/wallet/server/db/db.py new file mode 100644 index 000000000..f8bce3dc1 --- /dev/null +++ b/lbry/wallet/server/db/db.py @@ -0,0 +1,103 @@ +import struct +from typing import Optional +from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db.revertable import RevertableOpStack + + +class KeyValueStorage: + def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: + raise NotImplemented() + + def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, + include_key=True, include_value=True, fill_cache=True): + raise NotImplemented() + + def write_batch(self, transaction: bool = False): + raise NotImplemented() + + def close(self): + raise NotImplemented() + + @property + def closed(self) -> bool: + raise NotImplemented() + + +class PrefixDB: + UNDO_KEY_STRUCT = struct.Struct(b'>Q') + + def __init__(self, db: KeyValueStorage, unsafe_prefixes=None): + self._db = db + self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes) + + def unsafe_commit(self): + """ + Write staged changes to the database without keeping undo information + Changes written cannot be undone + """ + try: + with self._db.write_batch(transaction=True) as batch: + batch_put = batch.put + batch_delete = batch.delete + for staged_change in self._op_stack: + if staged_change.is_put: + batch_put(staged_change.key, staged_change.value) + else: + batch_delete(staged_change.key) + finally: + self._op_stack.clear() + + def commit(self, height: int): + """ + Write changes for a block height to the database and keep undo information so that the changes can be reverted + """ + undo_ops = self._op_stack.get_undo_ops() + try: + with self._db.write_batch(transaction=True) as batch: + batch_put = batch.put + batch_delete = batch.delete + for staged_change in self._op_stack: + if staged_change.is_put: + batch_put(staged_change.key, staged_change.value) + else: + batch_delete(staged_change.key) + batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height), undo_ops) + finally: + self._op_stack.clear() + + def rollback(self, height: int): + """ + Revert changes for a block height + """ + undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height) + self._op_stack.apply_packed_undo_ops(self._db.get(undo_key)) + try: + with self._db.write_batch(transaction=True) as batch: + batch_put = batch.put + batch_delete = batch.delete + for staged_change in self._op_stack: + if staged_change.is_put: + batch_put(staged_change.key, staged_change.value) + else: + batch_delete(staged_change.key) + batch_delete(undo_key) + finally: + self._op_stack.clear() + + def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: + return self._db.get(key, fill_cache=fill_cache) + + def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, + include_key=True, include_value=True, fill_cache=True): + return self._db.iterator( + reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop, + prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache + ) + + def close(self): + if not self._db.closed: + self._db.close() + + @property + def closed(self): + return self._db.closed diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index b86a38dd2..c7ec9c3a9 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -2,9 +2,9 @@ import typing import struct import array import base64 -import plyvel from typing import Union, Tuple, NamedTuple, Optional from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.schema.url import normalize_name @@ -38,13 +38,13 @@ class PrefixRow(metaclass=PrefixRowType): value_struct: struct.Struct key_part_lambdas = [] - def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack): + def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack): self._db = db self._op_stack = op_stack def iterate(self, prefix=None, start=None, stop=None, reverse: bool = False, include_key: bool = True, include_value: bool = True, - fill_cache: bool = True): + fill_cache: bool = True, deserialize_key: bool = True, deserialize_value: bool = True): if not prefix and not start and not stop: prefix = () if prefix is not None: @@ -54,25 +54,36 @@ class PrefixRow(metaclass=PrefixRowType): if stop is not None: stop = self.pack_partial_key(*stop) + if deserialize_key: + key_getter = lambda k: self.unpack_key(k) + else: + key_getter = lambda k: k + if deserialize_value: + value_getter = lambda v: self.unpack_value(v) + else: + value_getter = lambda v: v + if include_key and include_value: for k, v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, fill_cache=fill_cache): - yield self.unpack_key(k), self.unpack_value(v) + yield key_getter(k), value_getter(v) elif include_key: for k in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_value=False, fill_cache=fill_cache): - yield self.unpack_key(k) + yield key_getter(k) elif include_value: for v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False, fill_cache=fill_cache): - yield self.unpack_value(v) + yield value_getter(v) else: - raise RuntimeError + for _ in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False, + include_value=False, fill_cache=fill_cache): + yield None - def get(self, *key_args, fill_cache=True): + def get(self, *key_args, fill_cache=True, deserialize_value=True): v = self._db.get(self.pack_key(*key_args), fill_cache=fill_cache) if v: - return self.unpack_value(v) + return v if not deserialize_value else self.unpack_value(v) def stage_put(self, key_args=(), value_args=()): self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))) @@ -303,6 +314,28 @@ class ChannelToClaimValue(typing.NamedTuple): return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})" +class ChannelCountKey(typing.NamedTuple): + channel_hash: bytes + + def __str__(self): + return f"{self.__class__.__name__}(channel_hash={self.channel_hash.hex()})" + + +class ChannelCountValue(typing.NamedTuple): + count: int + + +class SupportAmountKey(typing.NamedTuple): + claim_hash: bytes + + def __str__(self): + return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})" + + +class SupportAmountValue(typing.NamedTuple): + amount: int + + class ClaimToSupportKey(typing.NamedTuple): claim_hash: bytes tx_num: int @@ -469,6 +502,20 @@ class TouchedOrDeletedClaimValue(typing.NamedTuple): f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})" +class DBState(typing.NamedTuple): + genesis: bytes + height: int + tx_count: int + tip: bytes + utxo_flush_count: int + wall_time: int + first_sync: bool + db_version: int + hist_flush_count: int + comp_flush_count: int + comp_cursor: int + + class ActiveAmountPrefixRow(PrefixRow): prefix = DB_PREFIXES.active_amount.value key_struct = struct.Struct(b'>20sBLLH') @@ -514,9 +561,7 @@ class ClaimToTXOPrefixRow(PrefixRow): @classmethod def pack_key(cls, claim_hash: bytes): - return super().pack_key( - claim_hash - ) + return super().pack_key(claim_hash) @classmethod def unpack_key(cls, key: bytes) -> ClaimToTXOKey: @@ -972,6 +1017,11 @@ class EffectiveAmountPrefixRow(PrefixRow): class RepostPrefixRow(PrefixRow): prefix = DB_PREFIXES.repost.value + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] + @classmethod def pack_key(cls, claim_hash: bytes): return cls.prefix + claim_hash @@ -1031,6 +1081,11 @@ class UndoPrefixRow(PrefixRow): prefix = DB_PREFIXES.undo.value key_struct = struct.Struct(b'>Q') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>Q').pack + ] + @classmethod def pack_key(cls, height: int): return super().pack_key(height) @@ -1059,6 +1114,11 @@ class BlockHashPrefixRow(PrefixRow): key_struct = struct.Struct(b'>L') value_struct = struct.Struct(b'>32s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack + ] + @classmethod def pack_key(cls, height: int) -> bytes: return super().pack_key(height) @@ -1085,6 +1145,11 @@ class BlockHeaderPrefixRow(PrefixRow): key_struct = struct.Struct(b'>L') value_struct = struct.Struct(b'>112s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack + ] + @classmethod def pack_key(cls, height: int) -> bytes: return super().pack_key(height) @@ -1111,6 +1176,11 @@ class TXNumPrefixRow(PrefixRow): key_struct = struct.Struct(b'>32s') value_struct = struct.Struct(b'>L') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>32s').pack + ] + @classmethod def pack_key(cls, tx_hash: bytes) -> bytes: return super().pack_key(tx_hash) @@ -1137,6 +1207,11 @@ class TxCountPrefixRow(PrefixRow): key_struct = struct.Struct(b'>L') value_struct = struct.Struct(b'>L') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack + ] + @classmethod def pack_key(cls, height: int) -> bytes: return super().pack_key(height) @@ -1163,6 +1238,11 @@ class TXHashPrefixRow(PrefixRow): key_struct = struct.Struct(b'>L') value_struct = struct.Struct(b'>32s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack + ] + @classmethod def pack_key(cls, tx_num: int) -> bytes: return super().pack_key(tx_num) @@ -1188,6 +1268,11 @@ class TXPrefixRow(PrefixRow): prefix = DB_PREFIXES.tx.value key_struct = struct.Struct(b'>32s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>32s').pack + ] + @classmethod def pack_key(cls, tx_hash: bytes) -> bytes: return super().pack_key(tx_hash) @@ -1313,6 +1398,10 @@ class TouchedOrDeletedPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_diff.value key_struct = struct.Struct(b'>L') value_struct = struct.Struct(b'>LL') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack + ] @classmethod def pack_key(cls, height: int): @@ -1324,16 +1413,19 @@ class TouchedOrDeletedPrefixRow(PrefixRow): @classmethod def pack_value(cls, touched, deleted) -> bytes: + assert True if not touched else all(len(item) == 20 for item in touched) + assert True if not deleted else all(len(item) == 20 for item in deleted) return cls.value_struct.pack(len(touched), len(deleted)) + b''.join(touched) + b''.join(deleted) @classmethod def unpack_value(cls, data: bytes) -> TouchedOrDeletedClaimValue: touched_len, deleted_len = cls.value_struct.unpack(data[:8]) - assert len(data) == 20 * (touched_len + deleted_len) + 8 - touched_bytes, deleted_bytes = data[8:touched_len*20+8], data[touched_len*20+8:touched_len*20+deleted_len*20+8] + data = data[8:] + assert len(data) == 20 * (touched_len + deleted_len) + touched_bytes, deleted_bytes = data[:touched_len*20], data[touched_len*20:] return TouchedOrDeletedClaimValue( - {touched_bytes[8+20*i:8+20*(i+1)] for i in range(touched_len)}, - {deleted_bytes[8+20*i:8+20*(i+1)] for i in range(deleted_len)} + {touched_bytes[20*i:20*(i+1)] for i in range(touched_len)}, + {deleted_bytes[20*i:20*(i+1)] for i in range(deleted_len)} ) @classmethod @@ -1341,87 +1433,170 @@ class TouchedOrDeletedPrefixRow(PrefixRow): return cls.pack_key(height), cls.pack_value(touched, deleted) -class Prefixes: - claim_to_support = ClaimToSupportPrefixRow - support_to_claim = SupportToClaimPrefixRow +class ChannelCountPrefixRow(PrefixRow): + prefix = DB_PREFIXES.channel_count.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'>L') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] - claim_to_txo = ClaimToTXOPrefixRow - txo_to_claim = TXOToClaimPrefixRow + @classmethod + def pack_key(cls, channel_hash: int): + return super().pack_key(channel_hash) - claim_to_channel = ClaimToChannelPrefixRow - channel_to_claim = ChannelToClaimPrefixRow + @classmethod + def unpack_key(cls, key: bytes) -> ChannelCountKey: + return ChannelCountKey(*super().unpack_key(key)) - claim_short_id = ClaimShortIDPrefixRow - claim_expiration = ClaimExpirationPrefixRow + @classmethod + def pack_value(cls, count: int) -> bytes: + return super().pack_value(count) - claim_takeover = ClaimTakeoverPrefixRow - pending_activation = PendingActivationPrefixRow - activated = ActivatedPrefixRow - active_amount = ActiveAmountPrefixRow + @classmethod + def unpack_value(cls, data: bytes) -> ChannelCountValue: + return ChannelCountValue(*super().unpack_value(data)) - effective_amount = EffectiveAmountPrefixRow - - repost = RepostPrefixRow - reposted_claim = RepostedPrefixRow - - undo = UndoPrefixRow - utxo = UTXOPrefixRow - hashX_utxo = HashXUTXOPrefixRow - hashX_history = HashXHistoryPrefixRow - block_hash = BlockHashPrefixRow - tx_count = TxCountPrefixRow - tx_hash = TXHashPrefixRow - tx_num = TXNumPrefixRow - tx = TXPrefixRow - header = BlockHeaderPrefixRow - touched_or_deleted = TouchedOrDeletedPrefixRow + @classmethod + def pack_item(cls, channel_hash, count): + return cls.pack_key(channel_hash), cls.pack_value(count) -class PrefixDB: - def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack): - self._db = db - self._op_stack = op_stack +class SupportAmountPrefixRow(PrefixRow): + prefix = DB_PREFIXES.support_amount.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'>Q') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] - self.claim_to_support = ClaimToSupportPrefixRow(db, op_stack) - self.support_to_claim = SupportToClaimPrefixRow(db, op_stack) - self.claim_to_txo = ClaimToTXOPrefixRow(db, op_stack) - self.txo_to_claim = TXOToClaimPrefixRow(db, op_stack) - self.claim_to_channel = ClaimToChannelPrefixRow(db, op_stack) - self.channel_to_claim = ChannelToClaimPrefixRow(db, op_stack) - self.claim_short_id = ClaimShortIDPrefixRow(db, op_stack) - self.claim_expiration = ClaimExpirationPrefixRow(db, op_stack) - self.claim_takeover = ClaimTakeoverPrefixRow(db, op_stack) - self.pending_activation = PendingActivationPrefixRow(db, op_stack) - self.activated = ActivatedPrefixRow(db, op_stack) - self.active_amount = ActiveAmountPrefixRow(db, op_stack) - self.effective_amount = EffectiveAmountPrefixRow(db, op_stack) - self.repost = RepostPrefixRow(db, op_stack) - self.reposted_claim = RepostedPrefixRow(db, op_stack) - self.undo = UndoPrefixRow(db, op_stack) - self.utxo = UTXOPrefixRow(db, op_stack) - self.hashX_utxo = HashXUTXOPrefixRow(db, op_stack) - self.hashX_history = HashXHistoryPrefixRow(db, op_stack) - self.block_hash = BlockHashPrefixRow(db, op_stack) - self.tx_count = TxCountPrefixRow(db, op_stack) - self.tx_hash = TXHashPrefixRow(db, op_stack) - self.tx_num = TXNumPrefixRow(db, op_stack) - self.tx = TXPrefixRow(db, op_stack) - self.header = BlockHeaderPrefixRow(db, op_stack) - self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack) + @classmethod + def pack_key(cls, claim_hash: bytes): + return super().pack_key(claim_hash) - def commit(self): - try: - with self._db.write_batch(transaction=True) as batch: - batch_put = batch.put - batch_delete = batch.delete + @classmethod + def unpack_key(cls, key: bytes) -> SupportAmountKey: + return SupportAmountKey(*super().unpack_key(key)) - for staged_change in self._op_stack: - if staged_change.is_put: - batch_put(staged_change.key, staged_change.value) - else: - batch_delete(staged_change.key) - finally: - self._op_stack.clear() + @classmethod + def pack_value(cls, amount: int) -> bytes: + return super().pack_value(amount) + + @classmethod + def unpack_value(cls, data: bytes) -> SupportAmountValue: + return SupportAmountValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, claim_hash, amount): + return cls.pack_key(claim_hash), cls.pack_value(amount) + + +class DBStatePrefixRow(PrefixRow): + prefix = DB_PREFIXES.db_state.value + value_struct = struct.Struct(b'>32sLL32sLLBBlll') + key_struct = struct.Struct(b'') + + key_part_lambdas = [ + lambda: b'' + ] + + @classmethod + def pack_key(cls) -> bytes: + return cls.prefix + + @classmethod + def unpack_key(cls, key: bytes): + return + + @classmethod + def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, + first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, + comp_cursor: int) -> bytes: + return super().pack_value( + genesis, height, tx_count, tip, utxo_flush_count, + wall_time, 1 if first_sync else 0, db_version, hist_flush_count, + comp_flush_count, comp_cursor + ) + + @classmethod + def unpack_value(cls, data: bytes) -> DBState: + return DBState(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, + first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, + comp_cursor: int): + return cls.pack_key(), cls.pack_value( + genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync, db_version, hist_flush_count, + comp_flush_count, comp_cursor + ) + + +class LevelDBStore(KeyValueStorage): + def __init__(self, path: str, cache_mb: int, max_open_files: int): + import plyvel + self.db = plyvel.DB( + path, create_if_missing=True, max_open_files=max_open_files, + lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, + max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 + ) + + def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: + return self.db.get(key, fill_cache=fill_cache) + + def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, + include_key=True, include_value=True, fill_cache=True): + return self.db.iterator( + reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop, + prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache + ) + + def write_batch(self, transaction: bool = False, sync: bool = False): + return self.db.write_batch(transaction=transaction, sync=sync) + + def close(self): + return self.db.close() + + @property + def closed(self) -> bool: + return self.db.closed + + +class HubDB(PrefixDB): + def __init__(self, path: str, cache_mb: int, max_open_files: int = 512): + db = LevelDBStore(path, cache_mb, max_open_files) + super().__init__(db, unsafe_prefixes={DB_PREFIXES.db_state.value}) + self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) + self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) + self.claim_to_txo = ClaimToTXOPrefixRow(db, self._op_stack) + self.txo_to_claim = TXOToClaimPrefixRow(db, self._op_stack) + self.claim_to_channel = ClaimToChannelPrefixRow(db, self._op_stack) + self.channel_to_claim = ChannelToClaimPrefixRow(db, self._op_stack) + self.claim_short_id = ClaimShortIDPrefixRow(db, self._op_stack) + self.claim_expiration = ClaimExpirationPrefixRow(db, self._op_stack) + self.claim_takeover = ClaimTakeoverPrefixRow(db, self._op_stack) + self.pending_activation = PendingActivationPrefixRow(db, self._op_stack) + self.activated = ActivatedPrefixRow(db, self._op_stack) + self.active_amount = ActiveAmountPrefixRow(db, self._op_stack) + self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack) + self.repost = RepostPrefixRow(db, self._op_stack) + self.reposted_claim = RepostedPrefixRow(db, self._op_stack) + self.undo = UndoPrefixRow(db, self._op_stack) + self.utxo = UTXOPrefixRow(db, self._op_stack) + self.hashX_utxo = HashXUTXOPrefixRow(db, self._op_stack) + self.hashX_history = HashXHistoryPrefixRow(db, self._op_stack) + self.block_hash = BlockHashPrefixRow(db, self._op_stack) + self.tx_count = TxCountPrefixRow(db, self._op_stack) + self.tx_hash = TXHashPrefixRow(db, self._op_stack) + self.tx_num = TXNumPrefixRow(db, self._op_stack) + self.tx = TXPrefixRow(db, self._op_stack) + self.header = BlockHeaderPrefixRow(db, self._op_stack) + self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, self._op_stack) + self.channel_count = ChannelCountPrefixRow(db, self._op_stack) + self.db_state = DBStatePrefixRow(db, self._op_stack) + self.support_amount = SupportAmountPrefixRow(db, self._op_stack) def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index ee226cac2..2c7f7f781 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -14,10 +14,8 @@ import array import time import typing import struct -import attr import zlib import base64 -import plyvel from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List, TYPE_CHECKING from functools import partial from asyncio import sleep @@ -32,10 +30,8 @@ from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str from lbry.wallet.server.tx import TxInput from lbry.wallet.server.merkle import Merkle, MerkleCache -from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES -from lbry.wallet.server.db.revertable import RevertableOpStack -from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB +from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue from lbry.wallet.transaction import OutputScript @@ -59,46 +55,8 @@ class UTXO(typing.NamedTuple): TXO_STRUCT = struct.Struct(b'>LH') TXO_STRUCT_unpack = TXO_STRUCT.unpack TXO_STRUCT_pack = TXO_STRUCT.pack - - -@attr.s(slots=True) -class FlushData: - height = attr.ib() - tx_count = attr.ib() - put_and_delete_ops = attr.ib() - tip = attr.ib() - - OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, ResolveCensoredError, LookupError, ValueError]] -DB_STATE_STRUCT = struct.Struct(b'>32sLL32sLLBBlll') -DB_STATE_STRUCT_SIZE = 94 - - -class DBState(typing.NamedTuple): - genesis: bytes - height: int - tx_count: int - tip: bytes - utxo_flush_count: int - wall_time: int - first_sync: bool - db_version: int - hist_flush_count: int - comp_flush_count: int - comp_cursor: int - - def pack(self) -> bytes: - return DB_STATE_STRUCT.pack( - self.genesis, self.height, self.tx_count, self.tip, self.utxo_flush_count, - self.wall_time, 1 if self.first_sync else 0, self.db_version, self.hist_flush_count, - self.comp_flush_count, self.comp_cursor - ) - - @classmethod - def unpack(cls, packed: bytes) -> 'DBState': - return cls(*DB_STATE_STRUCT.unpack(packed[:DB_STATE_STRUCT_SIZE])) - class DBError(Exception): """Raised on general DB errors generally indicating corruption.""" @@ -114,7 +72,6 @@ class LevelDB: self.logger.info(f'switching current directory to {env.db_dir}') - self.db = None self.prefix_db = None self.hist_unflushed = defaultdict(partial(array.array, 'I')) @@ -149,8 +106,6 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") - self.total_transactions = None - self.transaction_num_mapping = {} self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) @@ -173,65 +128,57 @@ class LevelDB: self.ledger = RegTestLedger def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]: - claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx)) + claim_hash_and_name = self.prefix_db.txo_to_claim.get(tx_num, tx_idx) if not claim_hash_and_name: return - return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name) + return claim_hash_and_name def get_repost(self, claim_hash) -> Optional[bytes]: - repost = self.db.get(Prefixes.repost.pack_key(claim_hash)) + repost = self.prefix_db.repost.get(claim_hash) if repost: - return Prefixes.repost.unpack_value(repost).reposted_claim_hash + return repost.reposted_claim_hash return def get_reposted_count(self, claim_hash: bytes) -> int: - cnt = 0 - for _ in self.db.iterator(prefix=Prefixes.reposted_claim.pack_partial_key(claim_hash)): - cnt += 1 - return cnt + return sum( + 1 for _ in self.prefix_db.reposted_claim.iterate(prefix=(claim_hash,), include_value=False) + ) def get_activation(self, tx_num, position, is_support=False) -> int: - activation = self.db.get( - Prefixes.activated.pack_key( - ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position - ) + activation = self.prefix_db.activated.get( + ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position ) if activation: - return Prefixes.activated.unpack_value(activation).height + return activation.height return -1 def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]: - key = Prefixes.support_to_claim.pack_key(tx_num, position) - supported_claim_hash = self.db.get(key) + supported_claim_hash = self.prefix_db.support_to_claim.get(tx_num, position) if supported_claim_hash: - packed_support_amount = self.db.get( - Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, position) + packed_support_amount = self.prefix_db.claim_to_support.get( + supported_claim_hash.claim_hash, tx_num, position ) if packed_support_amount: - return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount + return supported_claim_hash.claim_hash, packed_support_amount.amount return None, None def get_support_amount(self, claim_hash: bytes): - total = 0 - for packed in self.db.iterator(prefix=Prefixes.claim_to_support.pack_partial_key(claim_hash), include_key=False): - total += Prefixes.claim_to_support.unpack_value(packed).amount - return total + support_amount_val = self.prefix_db.support_amount.get(claim_hash) + if support_amount_val is None: + return 0 + return support_amount_val.amount def get_supports(self, claim_hash: bytes): - supports = [] - for k, v in self.db.iterator(prefix=Prefixes.claim_to_support.pack_partial_key(claim_hash)): - unpacked_k = Prefixes.claim_to_support.unpack_key(k) - unpacked_v = Prefixes.claim_to_support.unpack_value(v) - supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount)) - return supports + return [ + (k.tx_num, k.position, v.amount) for k, v in self.prefix_db.claim_to_support.iterate(prefix=(claim_hash,)) + ] def get_short_claim_id_url(self, name: str, normalized_name: str, claim_hash: bytes, root_tx_num: int, root_position: int) -> str: claim_id = claim_hash.hex() for prefix_len in range(10): - prefix = Prefixes.claim_short_id.pack_partial_key(normalized_name, claim_id[:prefix_len+1]) - for _k in self.db.iterator(prefix=prefix, include_value=False): - k = Prefixes.claim_short_id.unpack_key(_k) + for k in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:prefix_len+1]), + include_value=False): if k.root_tx_num == root_tx_num and k.root_position == root_position: return f'{name}#{k.partial_claim_id}' break @@ -247,7 +194,7 @@ class LevelDB: normalized_name = name controlling_claim = self.get_controlling_claim(normalized_name) - tx_hash = self.total_transactions[tx_num] + tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) height = bisect_right(self.tx_counts, tx_num) created_height = bisect_right(self.tx_counts, root_tx_num) last_take_over_height = controlling_claim.height @@ -314,10 +261,7 @@ class LevelDB: self.get_activation(claim_txo.tx_num, claim_txo.position), claim_txo.channel_signature_is_valid ) # resolve by partial/complete claim id - prefix = Prefixes.claim_short_id.pack_partial_key(normalized_name, claim_id[:10]) - for k, v in self.db.iterator(prefix=prefix): - key = Prefixes.claim_short_id.unpack_key(k) - claim_txo = Prefixes.claim_short_id.unpack_value(v) + for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])): claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position] non_normalized_name = self.claim_to_txo.get(claim_hash).name signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid @@ -329,12 +273,9 @@ class LevelDB: return # resolve by amount ordering, 1 indexed - prefix = Prefixes.effective_amount.pack_partial_key(normalized_name) - for idx, (k, v) in enumerate(self.db.iterator(prefix=prefix)): + for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))): if amount_order > idx + 1: continue - key = Prefixes.effective_amount.unpack_key(k) - claim_val = Prefixes.effective_amount.unpack_value(v) claim_txo = self.claim_to_txo.get(claim_val.claim_hash) activation = self.get_activation(key.tx_num, key.position) return self._prepare_resolve_result( @@ -345,9 +286,7 @@ class LevelDB: def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str): candidates = [] - for k, v in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash, normalized_name)): - key = Prefixes.channel_to_claim.unpack_key(k) - stream = Prefixes.channel_to_claim.unpack_value(v) + for key, stream in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash, normalized_name)): effective_amount = self.get_effective_amount(stream.claim_hash) if not candidates or candidates[-1][-1] == effective_amount: candidates.append((stream.claim_hash, key.tx_num, key.position, effective_amount)) @@ -362,7 +301,7 @@ class LevelDB: try: parsed = URL.parse(url) except ValueError as e: - return e, None + return e, None, None stream = channel = resolved_channel = resolved_stream = None if parsed.has_stream_in_channel: @@ -428,9 +367,9 @@ class LevelDB: return claim.amount def get_block_hash(self, height: int) -> Optional[bytes]: - v = self.db.get(Prefixes.block_hash.pack_key(height)) + v = self.prefix_db.block_hash.get(height) if v: - return Prefixes.block_hash.unpack_value(v).block_hash + return v.block_hash def get_support_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]: v = self.prefix_db.claim_to_support.get(claim_hash, tx_num, position) @@ -467,19 +406,19 @@ class LevelDB: def get_claims_for_name(self, name): claims = [] - prefix = Prefixes.claim_short_id.pack_partial_key(name) + bytes([1]) - for _k, _v in self.db.iterator(prefix=prefix): - v = Prefixes.claim_short_id.unpack_value(_v) + prefix = self.prefix_db.claim_short_id.pack_partial_key(name) + bytes([1]) + for _k, _v in self.prefix_db.iterator(prefix=prefix): + v = self.prefix_db.claim_short_id.unpack_value(_v) claim_hash = self.get_claim_from_txo(v.tx_num, v.position).claim_hash if claim_hash not in claims: claims.append(claim_hash) return claims def get_claims_in_channel_count(self, channel_hash) -> int: - count = 0 - for _ in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash,), include_key=False): - count += 1 - return count + channel_count_val = self.prefix_db.channel_count.get(channel_hash) + if channel_count_val is None: + return 0 + return channel_count_val.count async def reload_blocking_filtering_streams(self): def reload(): @@ -506,14 +445,15 @@ class LevelDB: return streams, channels def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]: - return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position)) + v = self.prefix_db.claim_to_channel.get(claim_hash, tx_num, position) + if v: + return v.signing_hash def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]: expired = {} - for _k, _v in self.db.iterator(prefix=Prefixes.claim_expiration.pack_partial_key(height)): - k, v = Prefixes.claim_expiration.unpack_item(_k, _v) - tx_hash = self.total_transactions[k.tx_num] - tx = self.coin.transaction(self.db.get(Prefixes.tx.pack_key(tx_hash))) + for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)): + tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False) + tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False)) # treat it like a claim spend so it will delete/abandon properly # the _spend_claim function this result is fed to expects a txi, so make a mock one # print(f"\texpired lbry://{v.name} {v.claim_hash.hex()}") @@ -524,21 +464,21 @@ class LevelDB: return expired def get_controlling_claim(self, name: str) -> Optional[ClaimTakeoverValue]: - controlling = self.db.get(Prefixes.claim_takeover.pack_key(name)) + controlling = self.prefix_db.claim_takeover.get(name) if not controlling: return - return Prefixes.claim_takeover.unpack_value(controlling) + return controlling def get_claim_txos_for_name(self, name: str): txos = {} - prefix = Prefixes.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big') - for k, v in self.db.iterator(prefix=prefix): - tx_num, nout = Prefixes.claim_short_id.unpack_value(v) + prefix = self.prefix_db.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big') + for k, v in self.prefix_db.iterator(prefix=prefix): + tx_num, nout = self.prefix_db.claim_short_id.unpack_value(v) txos[self.get_claim_from_txo(tx_num, nout).claim_hash] = tx_num, nout return txos def get_claim_metadata(self, tx_hash, nout): - raw = self.db.get(Prefixes.tx.pack_key(tx_hash)) + raw = self.prefix_db.tx.get(tx_hash, deserialize_value=False) try: output = self.coin.transaction(raw).outputs[nout] script = OutputScript(output.pk_script) @@ -547,7 +487,7 @@ class LevelDB: except: self.logger.error( "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), - raw.hex() + (raw or b'').hex() ) return @@ -577,7 +517,7 @@ class LevelDB: if not reposted_claim: return reposted_metadata = self.get_claim_metadata( - self.total_transactions[reposted_claim.tx_num], reposted_claim.position + self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position ) if not reposted_metadata: return @@ -591,8 +531,8 @@ class LevelDB: reposted_fee_currency = None reposted_duration = None if reposted_claim: - reposted_tx_hash = self.total_transactions[reposted_claim.tx_num] - raw_reposted_claim_tx = self.db.get(Prefixes.tx.pack_key(reposted_tx_hash)) + reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False) + raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False) try: reposted_claim_txo = self.coin.transaction( raw_reposted_claim_tx @@ -727,7 +667,7 @@ class LevelDB: batch = [] for claim_hash, claim_txo in self.claim_to_txo.items(): # TODO: fix the couple of claim txos that dont have controlling names - if not self.db.get(Prefixes.claim_takeover.pack_key(claim_txo.normalized_name)): + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): continue claim = self._fs_get_claim_by_hash(claim_hash) if claim: @@ -793,23 +733,17 @@ class LevelDB: def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list) - for _k, _v in self.db.iterator(prefix=Prefixes.pending_activation.pack_partial_key(height)): - k = Prefixes.pending_activation.unpack_key(_k) - v = Prefixes.pending_activation.unpack_value(_v) + for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)): activated[v].append(k) return activated - def get_future_activated(self, height: int) -> typing.Generator[ - Tuple[PendingActivationValue, PendingActivationKey], None, None]: - yielded = set() - start_prefix = Prefixes.pending_activation.pack_partial_key(height + 1) - stop_prefix = Prefixes.pending_activation.pack_partial_key(height + 1 + self.coin.maxTakeoverDelay) - for _k, _v in self.db.iterator(start=start_prefix, stop=stop_prefix, reverse=True): - if _v not in yielded: - yielded.add(_v) - v = Prefixes.pending_activation.unpack_value(_v) - k = Prefixes.pending_activation.unpack_key(_k) - yield v, k + def get_future_activated(self, height: int) -> typing.Dict[PendingActivationValue, PendingActivationKey]: + results = {} + for k, v in self.prefix_db.pending_activation.iterate( + start=(height + 1,), stop=(height + 1 + self.coin.maxTakeoverDelay,), reverse=True): + if v not in results: + results[v] = k + return results async def _read_tx_counts(self): if self.tx_counts is not None: @@ -818,11 +752,9 @@ class LevelDB: # height N. So tx_counts[0] is 1 - the genesis coinbase def get_counts(): - return tuple( - Prefixes.tx_count.unpack_value(packed_tx_count).tx_count - for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False, - fill_cache=False) - ) + return [ + v.tx_count for v in self.prefix_db.tx_count.iterate(include_key=False, fill_cache=False) + ] tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts) assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" @@ -834,20 +766,6 @@ class LevelDB: else: assert self.db_tx_count == 0 - async def _read_txids(self): - def get_txids(): - return list(self.db.iterator(prefix=Prefixes.tx_hash.prefix, include_key=False, fill_cache=False)) - - start = time.perf_counter() - self.logger.info("loading txids") - txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) - assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1] - self.total_transactions = txids - self.transaction_num_mapping = { - txid: i for i, txid in enumerate(txids) - } - ts = time.perf_counter() - start - self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4)) async def _read_claim_txos(self): def read_claim_txos(): @@ -870,8 +788,9 @@ class LevelDB: def get_headers(): return [ - header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False, - fill_cache=False) + header for header in self.prefix_db.header.iterate( + include_key=False, fill_cache=False, deserialize_value=False + ) ] headers = await asyncio.get_event_loop().run_in_executor(None, get_headers) @@ -884,23 +803,13 @@ class LevelDB: return int(160.6855883050695 * height) async def open_dbs(self): - if self.db: + if self.prefix_db and not self.prefix_db.closed: return - path = os.path.join(self.env.db_dir, 'lbry-leveldb') - is_new = os.path.isdir(path) - self.db = plyvel.DB( - path, create_if_missing=True, max_open_files=512, - lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, - max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 + self.prefix_db = HubDB( + os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.cache_MB, max_open_files=512 ) - self.db_op_stack = RevertableOpStack(self.db.get, unsafe_prefixes={DB_PREFIXES.trending_spike.value}) - self.prefix_db = PrefixDB(self.db, self.db_op_stack) - - if is_new: - self.logger.info('created new db: %s', f'lbry-leveldb') - else: - self.logger.info(f'opened db: %s', f'lbry-leveldb') + self.logger.info(f'opened db: lbry-leveldb') # read db state self.read_db_state() @@ -929,8 +838,6 @@ class LevelDB: # Read TX counts (requires meta directory) await self._read_tx_counts() - if self.total_transactions is None: - await self._read_txids() await self._read_headers() await self._read_claim_txos() @@ -938,7 +845,7 @@ class LevelDB: await self.search_index.start() def close(self): - self.db.close() + self.prefix_db.close() # Header merkle cache @@ -953,113 +860,6 @@ class LevelDB: async def header_branch_and_root(self, length, height): return await self.header_mc.branch_and_root(length, height) - # Flushing - def assert_flushed(self, flush_data): - """Asserts state is fully flushed.""" - assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count - assert flush_data.height == self.fs_height == self.db_height - assert flush_data.tip == self.db_tip - assert not len(flush_data.put_and_delete_ops) - - def flush_dbs(self, flush_data: FlushData): - if flush_data.height == self.db_height: - self.assert_flushed(flush_data) - return - - min_height = self.min_undo_height(self.db_height) - delete_undo_keys = [] - if min_height > 0: # delete undos for blocks deep enough they can't be reorged - delete_undo_keys.extend( - self.db.iterator( - start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False - ) - ) - delete_undo_keys.extend( - self.db.iterator( - start=Prefixes.touched_or_deleted.pack_key(0), - stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False - ) - ) - - with self.db.write_batch(transaction=True) as batch: - batch_put = batch.put - batch_delete = batch.delete - - for staged_change in flush_data.put_and_delete_ops: - if staged_change.is_put: - batch_put(staged_change.key, staged_change.value) - else: - batch_delete(staged_change.key) - for delete_key in delete_undo_keys: - batch_delete(delete_key) - - self.fs_height = flush_data.height - self.fs_tx_count = flush_data.tx_count - self.hist_flush_count += 1 - self.hist_unflushed_count = 0 - self.utxo_flush_count = self.hist_flush_count - self.db_height = flush_data.height - self.db_tx_count = flush_data.tx_count - self.db_tip = flush_data.tip - self.last_flush_tx_count = self.fs_tx_count - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - self.write_db_state(batch) - - def flush_backup(self, flush_data): - assert flush_data.height < self.db_height - assert not self.hist_unflushed - - start_time = time.time() - tx_delta = flush_data.tx_count - self.last_flush_tx_count - ### - self.fs_tx_count = flush_data.tx_count - # Truncate header_mc: header count is 1 more than the height. - self.header_mc.truncate(flush_data.height + 1) - ### - # Not certain this is needed, but it doesn't hurt - self.hist_flush_count += 1 - nremoves = 0 - - with self.db.write_batch(transaction=True) as batch: - batch_put = batch.put - batch_delete = batch.delete - for op in flush_data.put_and_delete_ops: - # print("REWIND", op) - if op.is_put: - batch_put(op.key, op.value) - else: - batch_delete(op.key) - while self.fs_height > flush_data.height: - self.fs_height -= 1 - - start_time = time.time() - if self.db.for_sync: - block_count = flush_data.height - self.db_height - tx_count = flush_data.tx_count - self.db_tx_count - elapsed = time.time() - start_time - self.logger.info(f'flushed {block_count:,d} blocks with ' - f'{tx_count:,d} txs in ' - f'{elapsed:.1f}s, committing...') - - self.utxo_flush_count = self.hist_flush_count - self.db_height = flush_data.height - self.db_tx_count = flush_data.tx_count - self.db_tip = flush_data.tip - - # Flush state last as it reads the wall time. - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - self.last_flush_tx_count = self.fs_tx_count - self.write_db_state(batch) - - self.logger.info(f'backing up removed {nremoves:,d} history entries') - elapsed = self.last_flush - start_time - self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. ' - f'Height {flush_data.height:,d} txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - def raw_header(self, height): """Return the binary header at the given height.""" header, n = self.read_headers(height, 1) @@ -1103,7 +903,7 @@ class LevelDB: if tx_height > self.db_height: return None, tx_height try: - return self.total_transactions[tx_num], tx_height + return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height except IndexError: self.logger.exception( "Failed to access a cached transaction, known bug #3142 " @@ -1111,9 +911,17 @@ class LevelDB: ) return None, tx_height + def get_block_txs(self, height: int) -> List[bytes]: + return [ + tx_hash for tx_hash in self.prefix_db.tx_hash.iterate( + start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],), + deserialize_value=False, include_key=False + ) + ] + def _fs_transactions(self, txids: Iterable[str]): tx_counts = self.tx_counts - tx_db_get = self.db.get + tx_db_get = self.prefix_db.tx.get tx_cache = self._tx_and_merkle_cache tx_infos = {} @@ -1123,13 +931,14 @@ class LevelDB: tx, merkle = cached_tx else: tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = self.transaction_num_mapping.get(tx_hash_bytes) + tx_num = self.prefix_db.tx_num.get(tx_hash_bytes) tx = None tx_height = -1 + tx_num = None if not tx_num else tx_num.tx_num if tx_num is not None: fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 tx_height = bisect_right(tx_counts, tx_num) - tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes), fill_cache=fill_cache) + tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) if tx_height == -1: merkle = { 'block_height': -1 @@ -1137,7 +946,7 @@ class LevelDB: else: tx_pos = tx_num - tx_counts[tx_height - 1] branch, root = self.merkle.branch_and_root( - self.total_transactions[tx_counts[tx_height - 1]:tx_counts[tx_height]], tx_pos + self.get_block_txs(tx_height), tx_pos ) merkle = { 'block_height': tx_height, @@ -1160,15 +969,17 @@ class LevelDB: raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') return [self.coin.header_hash(header) for header in self.headers[height:height + count]] - def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: - txs = array.array('I') - for hist in self.db.iterator(prefix=Prefixes.hashX_history.pack_partial_key(hashX), include_key=False): - a = array.array('I') - a.frombytes(hist) - txs.extend(a) + def read_history(self, hashX: bytes, limit: int = 1000) -> List[Tuple[bytes, int]]: + txs = [] + txs_extend = txs.extend + for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): + txs_extend([ + (self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num)) + for tx_num in hist + ]) if len(txs) >= limit: break - return txs.tolist() + return txs async def limited_history(self, hashX, *, limit=1000): """Return an unpruned, sorted list of (tx_hash, height) tuples of @@ -1177,13 +988,7 @@ class LevelDB: transactions. By default returns at most 1000 entries. Set limit to None to get them all. """ - while True: - history = await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit) - if history is not None: - return [(self.total_transactions[tx_num], bisect_right(self.tx_counts, tx_num)) for tx_num in history] - self.logger.warning(f'limited_history: tx hash ' - f'not found (reorg?), retrying...') - await sleep(0.25) + return await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit) # -- Undo information @@ -1191,45 +996,33 @@ class LevelDB: """Returns a height from which we should store undo info.""" return max_height - self.env.reorg_limit + 1 - def undo_key(self, height: int) -> bytes: - """DB key for undo information at the given height.""" - return Prefixes.undo.pack_key(height) - def read_undo_info(self, height: int): - return self.db.get(Prefixes.undo.pack_key(height)), self.db.get(Prefixes.touched_or_deleted.pack_key(height)) + return self.prefix_db.undo.get(height), self.prefix_db.touched_or_deleted.get(height) def apply_expiration_extension_fork(self): # TODO: this can't be reorged - deletes = [] - adds = [] - - for k, v in self.db.iterator(prefix=Prefixes.claim_expiration.prefix): - old_key = Prefixes.claim_expiration.unpack_key(k) - new_key = Prefixes.claim_expiration.pack_key( - bisect_right(self.tx_counts, old_key.tx_num) + self.coin.nExtendedClaimExpirationTime, - old_key.tx_num, old_key.position + for k, v in self.prefix_db.claim_expiration.iterate(): + self.prefix_db.claim_expiration.stage_delete(k, v) + self.prefix_db.claim_expiration.stage_put( + (bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime, + k.tx_num, k.position), v ) - deletes.append(k) - adds.append((new_key, v)) - with self.db.write_batch(transaction=True) as batch: - for k in deletes: - batch.delete(k) - for k, v in adds: - batch.put(k, v) + self.prefix_db.unsafe_commit() - def write_db_state(self, batch): + def write_db_state(self): """Write (UTXO) state to the batch.""" - batch.put( - DB_PREFIXES.db_state.value, - DBState( + if self.db_height > 0: + self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get()) + self.prefix_db.db_state.stage_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version, self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor - ).pack() + ) ) def read_db_state(self): - state = self.db.get(DB_PREFIXES.db_state.value) + state = self.prefix_db.db_state.get() + if not state: self.db_height = -1 self.db_tx_count = 0 @@ -1243,7 +1036,6 @@ class LevelDB: self.hist_comp_cursor = -1 self.hist_db_version = max(self.DB_VERSIONS) else: - state = DBState.unpack(state) self.db_version = state.db_version if self.db_version not in self.DB_VERSIONS: raise DBError(f'your DB version is {self.db_version} but this ' @@ -1264,15 +1056,21 @@ class LevelDB: self.hist_comp_cursor = state.comp_cursor self.hist_db_version = state.db_version + def assert_db_state(self): + state = self.prefix_db.db_state.get() + assert self.db_version == state.db_version + assert self.db_height == state.height + assert self.db_tx_count == state.tx_count + assert self.db_tip == state.tip + assert self.first_sync == state.first_sync + async def all_utxos(self, hashX): """Return all UTXOs for an address sorted in no particular order.""" def read_utxos(): utxos = [] utxos_append = utxos.append fs_tx_hash = self.fs_tx_hash - for db_key, db_value in self.db.iterator(prefix=Prefixes.utxo.pack_partial_key(hashX)): - k = Prefixes.utxo.unpack_key(db_key) - v = Prefixes.utxo.unpack_value(db_value) + for k, v in self.prefix_db.utxo.iterate(prefix=(hashX, )): tx_hash, height = fs_tx_hash(k.tx_num) utxos_append(UTXO(k.tx_num, k.nout, tx_hash, height, v.amount)) return utxos @@ -1290,14 +1088,16 @@ class LevelDB: utxos = [] utxo_append = utxos.append for (tx_hash, nout) in prevouts: - if tx_hash not in self.transaction_num_mapping: + tx_num_val = self.prefix_db.tx_num.get(tx_hash) + if not tx_num_val: continue - tx_num = self.transaction_num_mapping[tx_hash] - hashX = self.db.get(Prefixes.hashX_utxo.pack_key(tx_hash[:4], tx_num, nout)) - if not hashX: + tx_num = tx_num_val.tx_num + hashX_val = self.prefix_db.hashX_utxo.get(tx_hash[:4], tx_num, nout) + if not hashX_val: continue - utxo_value = self.db.get(Prefixes.utxo.pack_key(hashX, tx_num, nout)) + hashX = hashX_val.hashX + utxo_value = self.prefix_db.utxo.get(hashX, tx_num, nout) if utxo_value: - utxo_append((hashX, Prefixes.utxo.unpack_value(utxo_value).amount)) + utxo_append((hashX, utxo_value.amount)) return utxos return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos) diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index 04416e117..6899d0974 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -115,7 +115,7 @@ class BaseResolveTestCase(CommandTestCase): def check_supports(claim_id, lbrycrd_supports): for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))): support = lbrycrd_supports[i] - self.assertEqual(support['txId'], db.total_transactions[tx_num][::-1].hex()) + self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex()) self.assertEqual(support['n'], position) self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num)) self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True)) @@ -127,7 +127,7 @@ class BaseResolveTestCase(CommandTestCase): check_supports(c['claimId'], c['supports']) claim_hash = bytes.fromhex(c['claimId']) self.assertEqual(c['validAtHeight'], db.get_activation( - db.transaction_num_mapping[bytes.fromhex(c['txId'])[::-1]], c['n'] + db.prefix_db.tx_num.get(bytes.fromhex(c['txId'])[::-1]).tx_num, c['n'] )) self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash)) @@ -1451,19 +1451,13 @@ class ResolveAfterReorg(BaseResolveTestCase): async def assertBlockHash(self, height): bp = self.conductor.spv_node.server.bp - - def get_txids(): - return [ - bp.db.fs_tx_hash(tx_num)[0][::-1].hex() - for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height]) - ] - block_hash = await self.blockchain.get_block_hash(height) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) - - txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) + txids = [ + tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height) + ] txs = await bp.db.fs_transactions(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') diff --git a/tests/unit/wallet/server/test_revertable.py b/tests/unit/wallet/server/test_revertable.py index 1fa765537..42318b53b 100644 --- a/tests/unit/wallet/server/test_revertable.py +++ b/tests/unit/wallet/server/test_revertable.py @@ -1,6 +1,8 @@ import unittest +import tempfile +import shutil from lbry.wallet.server.db.revertable import RevertableOpStack, RevertableDelete, RevertablePut, OpStackIntegrity -from lbry.wallet.server.db.prefixes import Prefixes +from lbry.wallet.server.db.prefixes import ClaimToTXOPrefixRow, HubDB class TestRevertableOpStack(unittest.TestCase): @@ -25,14 +27,14 @@ class TestRevertableOpStack(unittest.TestCase): self.stack.append_op(RevertablePut(key2, value2)) def test_simplify(self): - key1 = Prefixes.claim_to_txo.pack_key(b'\x01' * 20) - key2 = Prefixes.claim_to_txo.pack_key(b'\x02' * 20) - key3 = Prefixes.claim_to_txo.pack_key(b'\x03' * 20) - key4 = Prefixes.claim_to_txo.pack_key(b'\x04' * 20) + key1 = ClaimToTXOPrefixRow.pack_key(b'\x01' * 20) + key2 = ClaimToTXOPrefixRow.pack_key(b'\x02' * 20) + key3 = ClaimToTXOPrefixRow.pack_key(b'\x03' * 20) + key4 = ClaimToTXOPrefixRow.pack_key(b'\x04' * 20) - val1 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'derp') - val2 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'oops') - val3 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'other') + val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'derp') + val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'oops') + val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'other') # check that we can't delete a non existent value with self.assertRaises(OpStackIntegrity): @@ -101,3 +103,48 @@ class TestRevertableOpStack(unittest.TestCase): self.process_stack() self.assertDictEqual({key2: val3}, self.fake_db) + +class TestRevertablePrefixDB(unittest.TestCase): + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.db = HubDB(self.tmp_dir, cache_mb=1, max_open_files=32) + + def tearDown(self) -> None: + self.db.close() + shutil.rmtree(self.tmp_dir) + + def test_rollback(self): + name = 'derp' + claim_hash1 = 20 * b'\x00' + claim_hash2 = 20 * b'\x01' + claim_hash3 = 20 * b'\x02' + + takeover_height = 10000000 + + self.assertIsNone(self.db.claim_takeover.get(name)) + self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height)) + self.db.commit(10000000) + self.assertEqual(10000000, self.db.claim_takeover.get(name).height) + + self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height)) + self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1)) + self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1)) + self.db.commit(10000001) + self.assertIsNone(self.db.claim_takeover.get(name)) + self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2)) + self.db.commit(10000002) + self.assertEqual(10000002, self.db.claim_takeover.get(name).height) + + self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2)) + self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3)) + self.db.commit(10000003) + self.assertEqual(10000003, self.db.claim_takeover.get(name).height) + + self.db.rollback(10000003) + self.assertEqual(10000002, self.db.claim_takeover.get(name).height) + self.db.rollback(10000002) + self.assertIsNone(self.db.claim_takeover.get(name)) + self.db.rollback(10000001) + self.assertEqual(10000000, self.db.claim_takeover.get(name).height) + self.db.rollback(10000000) + self.assertIsNone(self.db.claim_takeover.get(name))