block processor db refactoring
-access db through HubDB class, don't use plyvel.DB directly -add channel count and support amount prefixes
This commit is contained in:
parent
01ee4b23e6
commit
4cf76123e5
8 changed files with 924 additions and 861 deletions
|
@ -17,15 +17,11 @@ from lbry.wallet.server.daemon import DaemonError
|
|||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.util import chunks, class_logger
|
||||
from lbry.crypto.hash import hash160
|
||||
from lbry.wallet.server.leveldb import FlushData
|
||||
from lbry.wallet.server.mempool import MemPool
|
||||
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
|
||||
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
|
||||
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
|
||||
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue
|
||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
||||
from lbry.wallet.server.udp import StatusServer
|
||||
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
|
||||
|
@ -153,6 +149,31 @@ class ChainError(Exception):
|
|||
"""Raised on error processing blocks."""
|
||||
|
||||
|
||||
class StagedClaimtrieItem(typing.NamedTuple):
|
||||
name: str
|
||||
normalized_name: str
|
||||
claim_hash: bytes
|
||||
amount: int
|
||||
expiration_height: int
|
||||
tx_num: int
|
||||
position: int
|
||||
root_tx_num: int
|
||||
root_position: int
|
||||
channel_signature_is_valid: bool
|
||||
signing_hash: Optional[bytes]
|
||||
reposted_claim_hash: Optional[bytes]
|
||||
|
||||
@property
|
||||
def is_update(self) -> bool:
|
||||
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
|
||||
|
||||
def invalidate_signature(self) -> 'StagedClaimtrieItem':
|
||||
return StagedClaimtrieItem(
|
||||
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
|
||||
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
|
||||
)
|
||||
|
||||
|
||||
NAMESPACE = "wallet_server"
|
||||
HISTOGRAM_BUCKETS = (
|
||||
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
||||
|
@ -257,6 +278,7 @@ class BlockProcessor:
|
|||
|
||||
self.pending_reposted = set()
|
||||
self.pending_channel_counts = defaultdict(lambda: 0)
|
||||
self.pending_support_amount_change = defaultdict(lambda: 0)
|
||||
|
||||
self.pending_channels = {}
|
||||
self.amount_cache = {}
|
||||
|
@ -266,6 +288,9 @@ class BlockProcessor:
|
|||
self.claim_channels: Dict[bytes, bytes] = {}
|
||||
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
|
||||
|
||||
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
|
||||
self.pending_transactions: Dict[int, bytes] = {}
|
||||
|
||||
async def claim_producer(self):
|
||||
if self.db.db_height <= 1:
|
||||
return
|
||||
|
@ -319,7 +344,7 @@ class BlockProcessor:
|
|||
self.logger.warning(
|
||||
"applying extended claim expiration fork on claims accepted by, %i", self.height
|
||||
)
|
||||
await self.run_in_thread(self.db.apply_expiration_extension_fork)
|
||||
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
|
||||
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||
if not self.db.first_sync:
|
||||
await self.db.reload_blocking_filtering_streams()
|
||||
|
@ -362,7 +387,6 @@ class BlockProcessor:
|
|||
assert count > 0, count
|
||||
for _ in range(count):
|
||||
await self.backup_block()
|
||||
await self.flush()
|
||||
self.logger.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
await self.db._read_claim_txos() # TODO: don't do this
|
||||
|
@ -392,23 +416,12 @@ class BlockProcessor:
|
|||
'resetting the prefetcher')
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
|
||||
# - Flushing
|
||||
def flush_data(self):
|
||||
"""The data for a flush. The lock must be taken."""
|
||||
assert self.state_lock.locked()
|
||||
return FlushData(self.height, self.tx_count, self.db_op_stack, self.tip)
|
||||
|
||||
async def flush(self):
|
||||
def flush():
|
||||
self.db.flush_dbs(self.flush_data())
|
||||
self.db.write_db_state()
|
||||
self.db.prefix_db.commit(self.height)
|
||||
self.clear_after_advance_or_reorg()
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
async def write_state(self):
|
||||
def flush():
|
||||
with self.db.db.write_batch(transaction=True) as batch:
|
||||
self.db.write_db_state(batch)
|
||||
|
||||
self.db.assert_db_state()
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
|
||||
|
@ -456,7 +469,11 @@ class BlockProcessor:
|
|||
signing_channel = self.db.get_claim_txo(signing_channel_hash)
|
||||
|
||||
if signing_channel:
|
||||
raw_channel_tx = self.db.prefix_db.tx.get(self.db.total_transactions[signing_channel.tx_num]).raw_tx
|
||||
raw_channel_tx = self.db.prefix_db.tx.get(
|
||||
self.db.prefix_db.tx_hash.get(
|
||||
signing_channel.tx_num, deserialize_value=False
|
||||
), deserialize_value=False
|
||||
)
|
||||
channel_pub_key_bytes = None
|
||||
try:
|
||||
if not signing_channel:
|
||||
|
@ -503,11 +520,9 @@ class BlockProcessor:
|
|||
root_tx_num, root_idx = previous_claim.root_tx_num, previous_claim.root_position
|
||||
activation = self.db.get_activation(prev_tx_num, prev_idx)
|
||||
claim_name = previous_claim.name
|
||||
self.db_op_stack.extend_ops(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, normalized_name,
|
||||
previous_claim.amount
|
||||
).get_remove_activate_ops()
|
||||
self.get_remove_activate_ops(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, claim_hash, prev_tx_num, prev_idx, activation, normalized_name,
|
||||
previous_claim.amount
|
||||
)
|
||||
previous_amount = previous_claim.amount
|
||||
self.updated_claims.add(claim_hash)
|
||||
|
@ -523,16 +538,101 @@ class BlockProcessor:
|
|||
)
|
||||
self.txo_to_claim[(tx_num, nout)] = pending
|
||||
self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
|
||||
self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops())
|
||||
self.get_add_claim_utxo_ops(pending)
|
||||
|
||||
def get_add_claim_utxo_ops(self, pending: StagedClaimtrieItem):
|
||||
# claim tip by claim hash
|
||||
self.db.prefix_db.claim_to_txo.stage_put(
|
||||
(pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position,
|
||||
pending.amount, pending.channel_signature_is_valid, pending.name)
|
||||
)
|
||||
# claim hash by txo
|
||||
self.db.prefix_db.txo_to_claim.stage_put(
|
||||
(pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name)
|
||||
)
|
||||
|
||||
# claim expiration
|
||||
self.db.prefix_db.claim_expiration.stage_put(
|
||||
(pending.expiration_height, pending.tx_num, pending.position),
|
||||
(pending.claim_hash, pending.normalized_name)
|
||||
)
|
||||
|
||||
# short url resolution
|
||||
for prefix_len in range(10):
|
||||
self.db.prefix_db.claim_short_id.stage_put(
|
||||
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
|
||||
pending.root_tx_num, pending.root_position),
|
||||
(pending.tx_num, pending.position)
|
||||
)
|
||||
|
||||
if pending.signing_hash and pending.channel_signature_is_valid:
|
||||
# channel by stream
|
||||
self.db.prefix_db.claim_to_channel.stage_put(
|
||||
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
|
||||
)
|
||||
# stream by channel
|
||||
self.db.prefix_db.channel_to_claim.stage_put(
|
||||
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
|
||||
(pending.claim_hash,)
|
||||
)
|
||||
|
||||
if pending.reposted_claim_hash:
|
||||
self.db.prefix_db.repost.stage_put((pending.claim_hash,), (pending.reposted_claim_hash,))
|
||||
self.db.prefix_db.reposted_claim.stage_put(
|
||||
(pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,)
|
||||
)
|
||||
|
||||
def get_remove_claim_utxo_ops(self, pending: StagedClaimtrieItem):
|
||||
# claim tip by claim hash
|
||||
self.db.prefix_db.claim_to_txo.stage_delete(
|
||||
(pending.claim_hash,), (pending.tx_num, pending.position, pending.root_tx_num, pending.root_position,
|
||||
pending.amount, pending.channel_signature_is_valid, pending.name)
|
||||
)
|
||||
# claim hash by txo
|
||||
self.db.prefix_db.txo_to_claim.stage_delete(
|
||||
(pending.tx_num, pending.position), (pending.claim_hash, pending.normalized_name)
|
||||
)
|
||||
|
||||
# claim expiration
|
||||
self.db.prefix_db.claim_expiration.stage_delete(
|
||||
(pending.expiration_height, pending.tx_num, pending.position),
|
||||
(pending.claim_hash, pending.normalized_name)
|
||||
)
|
||||
|
||||
# short url resolution
|
||||
for prefix_len in range(10):
|
||||
self.db.prefix_db.claim_short_id.stage_delete(
|
||||
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
|
||||
pending.root_tx_num, pending.root_position),
|
||||
(pending.tx_num, pending.position)
|
||||
)
|
||||
|
||||
if pending.signing_hash and pending.channel_signature_is_valid:
|
||||
# channel by stream
|
||||
self.db.prefix_db.claim_to_channel.stage_delete(
|
||||
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
|
||||
)
|
||||
# stream by channel
|
||||
self.db.prefix_db.channel_to_claim.stage_delete(
|
||||
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
|
||||
(pending.claim_hash,)
|
||||
)
|
||||
|
||||
if pending.reposted_claim_hash:
|
||||
self.db.prefix_db.repost.stage_delete((pending.claim_hash,), (pending.reposted_claim_hash,))
|
||||
self.db.prefix_db.reposted_claim.stage_delete(
|
||||
(pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,)
|
||||
)
|
||||
|
||||
def _add_support(self, height: int, txo: 'Output', tx_num: int, nout: int):
|
||||
supported_claim_hash = txo.claim_hash[::-1]
|
||||
self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout))
|
||||
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount
|
||||
# print(f"\tsupport claim {supported_claim_hash.hex()} +{txo.amount}")
|
||||
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
|
||||
supported_claim_hash, tx_num, nout, txo.amount
|
||||
).get_add_support_utxo_ops())
|
||||
|
||||
self.db.prefix_db.claim_to_support.stage_put((supported_claim_hash, tx_num, nout), (txo.amount,))
|
||||
self.db.prefix_db.support_to_claim.stage_put((tx_num, nout), (supported_claim_hash,))
|
||||
self.pending_support_amount_change[supported_claim_hash] += txo.amount
|
||||
|
||||
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output',
|
||||
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]):
|
||||
|
@ -542,7 +642,7 @@ class BlockProcessor:
|
|||
self._add_support(height, txo, tx_num, nout)
|
||||
|
||||
def _spend_support_txo(self, height: int, txin: TxInput):
|
||||
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||
txin_num = self.get_pending_tx_num(txin.prev_hash)
|
||||
activation = 0
|
||||
if (txin_num, txin.prev_idx) in self.support_txo_to_claim:
|
||||
spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx))
|
||||
|
@ -561,17 +661,17 @@ class BlockProcessor:
|
|||
if 0 < activation < self.height + 1:
|
||||
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
|
||||
if supported_name is not None and activation > 0:
|
||||
self.db_op_stack.extend_ops(StagedActivation(
|
||||
self.get_remove_activate_ops(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
|
||||
support_amount
|
||||
).get_remove_activate_ops())
|
||||
)
|
||||
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
|
||||
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
|
||||
spent_support, txin_num, txin.prev_idx, support_amount
|
||||
).get_spend_support_txo_ops())
|
||||
self.db.prefix_db.claim_to_support.stage_delete((spent_support, txin_num, txin.prev_idx), (support_amount,))
|
||||
self.db.prefix_db.support_to_claim.stage_delete((txin_num, txin.prev_idx), (spent_support,))
|
||||
self.pending_support_amount_change[spent_support] -= support_amount
|
||||
|
||||
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool:
|
||||
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||
txin_num = self.get_pending_tx_num(txin.prev_hash)
|
||||
if (txin_num, txin.prev_idx) in self.txo_to_claim:
|
||||
spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
|
||||
else:
|
||||
|
@ -593,7 +693,7 @@ class BlockProcessor:
|
|||
self.pending_channel_counts[spent.signing_hash] -= 1
|
||||
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.normalized_name)
|
||||
# print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
|
||||
self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops())
|
||||
self.get_remove_claim_utxo_ops(spent)
|
||||
return True
|
||||
|
||||
def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims):
|
||||
|
@ -633,9 +733,31 @@ class BlockProcessor:
|
|||
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
|
||||
self._invalidate_channel_signatures(claim_hash)
|
||||
|
||||
def _get_invalidate_signature_ops(self, pending: StagedClaimtrieItem):
|
||||
if not pending.signing_hash:
|
||||
return
|
||||
self.db.prefix_db.claim_to_channel.stage_delete(
|
||||
(pending.claim_hash, pending.tx_num, pending.position), (pending.signing_hash,)
|
||||
)
|
||||
if pending.channel_signature_is_valid:
|
||||
self.db.prefix_db.channel_to_claim.stage_delete(
|
||||
(pending.signing_hash, pending.normalized_name, pending.tx_num, pending.position),
|
||||
(pending.claim_hash,)
|
||||
)
|
||||
self.db.prefix_db.claim_to_txo.stage_delete(
|
||||
(pending.claim_hash,),
|
||||
(pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount,
|
||||
pending.channel_signature_is_valid, pending.name)
|
||||
)
|
||||
self.db.prefix_db.claim_to_txo.stage_put(
|
||||
(pending.claim_hash,),
|
||||
(pending.tx_num, pending.position, pending.root_tx_num, pending.root_position, pending.amount,
|
||||
False, pending.name)
|
||||
)
|
||||
|
||||
def _invalidate_channel_signatures(self, claim_hash: bytes):
|
||||
for k, signed_claim_hash in self.db.db.iterator(
|
||||
prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)):
|
||||
for (signed_claim_hash, ) in self.db.prefix_db.channel_to_claim.iterate(
|
||||
prefix=(claim_hash, ), include_key=False):
|
||||
if signed_claim_hash in self.abandoned_claims or signed_claim_hash in self.expired_claim_hashes:
|
||||
continue
|
||||
# there is no longer a signing channel for this claim as of this block
|
||||
|
@ -657,12 +779,12 @@ class BlockProcessor:
|
|||
claim = self._make_pending_claim_txo(signed_claim_hash)
|
||||
self.signatures_changed.add(signed_claim_hash)
|
||||
self.pending_channel_counts[claim_hash] -= 1
|
||||
self.db_op_stack.extend_ops(claim.get_invalidate_signature_ops())
|
||||
self._get_invalidate_signature_ops(claim)
|
||||
|
||||
for staged in list(self.txo_to_claim.values()):
|
||||
needs_invalidate = staged.claim_hash not in self.doesnt_have_valid_signature
|
||||
if staged.signing_hash == claim_hash and needs_invalidate:
|
||||
self.db_op_stack.extend_ops(staged.get_invalidate_signature_ops())
|
||||
self._get_invalidate_signature_ops(staged)
|
||||
self.txo_to_claim[self.claim_hash_to_txo[staged.claim_hash]] = staged.invalidate_signature()
|
||||
self.signatures_changed.add(staged.claim_hash)
|
||||
self.pending_channel_counts[claim_hash] -= 1
|
||||
|
@ -758,6 +880,30 @@ class BlockProcessor:
|
|||
support_amount = self._get_pending_supported_amount(claim_hash, height=height)
|
||||
return claim_amount + support_amount
|
||||
|
||||
def get_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int,
|
||||
activation_height: int, name: str, amount: int):
|
||||
self.db.prefix_db.activated.stage_put(
|
||||
(txo_type, tx_num, position), (activation_height, claim_hash, name)
|
||||
)
|
||||
self.db.prefix_db.pending_activation.stage_put(
|
||||
(activation_height, txo_type, tx_num, position), (claim_hash, name)
|
||||
)
|
||||
self.db.prefix_db.active_amount.stage_put(
|
||||
(claim_hash, txo_type, activation_height, tx_num, position), (amount,)
|
||||
)
|
||||
|
||||
def get_remove_activate_ops(self, txo_type: int, claim_hash: bytes, tx_num: int, position: int,
|
||||
activation_height: int, name: str, amount: int):
|
||||
self.db.prefix_db.activated.stage_delete(
|
||||
(txo_type, tx_num, position), (activation_height, claim_hash, name)
|
||||
)
|
||||
self.db.prefix_db.pending_activation.stage_delete(
|
||||
(activation_height, txo_type, tx_num, position), (claim_hash, name)
|
||||
)
|
||||
self.db.prefix_db.active_amount.stage_delete(
|
||||
(claim_hash, txo_type, activation_height, tx_num, position), (amount,)
|
||||
)
|
||||
|
||||
def _get_takeover_ops(self, height: int):
|
||||
|
||||
# cache for controlling claims as of the previous block
|
||||
|
@ -779,7 +925,7 @@ class BlockProcessor:
|
|||
future_activations = defaultdict(dict)
|
||||
|
||||
def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int,
|
||||
amount: int, is_support: bool) -> List['RevertableOp']:
|
||||
amount: int, is_support: bool):
|
||||
controlling = get_controlling(name)
|
||||
nothing_is_controlling = not controlling
|
||||
staged_is_controlling = False if not controlling else claim_hash == controlling.claim_hash
|
||||
|
@ -812,10 +958,10 @@ class BlockProcessor:
|
|||
))
|
||||
if is_support:
|
||||
self.possible_future_support_txos_by_claim_hash[claim_hash].append((tx_num, nout))
|
||||
return StagedActivation(
|
||||
self.get_activate_ops(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, claim_hash, tx_num, nout,
|
||||
height + delay, name, amount
|
||||
).get_activate_ops()
|
||||
)
|
||||
|
||||
# determine names needing takeover/deletion due to controlling claims being abandoned
|
||||
# and add ops to deactivate abandoned claims
|
||||
|
@ -827,11 +973,9 @@ class BlockProcessor:
|
|||
activation = self.db.get_activation(staged.tx_num, staged.position)
|
||||
if activation > 0: # db returns -1 for non-existent txos
|
||||
# removed queued future activation from the db
|
||||
self.db_op_stack.extend_ops(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position,
|
||||
activation, staged.normalized_name, staged.amount
|
||||
).get_remove_activate_ops()
|
||||
self.get_remove_activate_ops(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position,
|
||||
activation, staged.normalized_name, staged.amount
|
||||
)
|
||||
else:
|
||||
# it hadn't yet been activated
|
||||
|
@ -868,10 +1012,10 @@ class BlockProcessor:
|
|||
prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position)
|
||||
if height < prev_activation or prev_activation < 0:
|
||||
is_delayed = True
|
||||
self.db_op_stack.extend_ops(get_delayed_activate_ops(
|
||||
get_delayed_activate_ops(
|
||||
staged.normalized_name, staged.claim_hash, is_delayed, tx_num, nout, staged.amount,
|
||||
is_support=False
|
||||
))
|
||||
)
|
||||
|
||||
# and the supports
|
||||
for (tx_num, nout), (claim_hash, amount) in self.support_txo_to_claim.items():
|
||||
|
@ -889,9 +1033,9 @@ class BlockProcessor:
|
|||
v = supported_claim_info
|
||||
name = v.normalized_name
|
||||
staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position)
|
||||
self.db_op_stack.extend_ops(get_delayed_activate_ops(
|
||||
get_delayed_activate_ops(
|
||||
name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True
|
||||
))
|
||||
)
|
||||
|
||||
# add the activation/delayed-activation ops
|
||||
for activated, activated_txos in activated_at_height.items():
|
||||
|
@ -962,7 +1106,9 @@ class BlockProcessor:
|
|||
if not has_candidate:
|
||||
# remove name takeover entry, the name is now unclaimed
|
||||
controlling = get_controlling(need_takeover)
|
||||
self.db_op_stack.extend_ops(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
|
||||
self.db.prefix_db.claim_takeover.stage_delete(
|
||||
(need_takeover,), (controlling.claim_hash, controlling.height)
|
||||
)
|
||||
|
||||
# scan for possible takeovers out of the accumulated activations, of these make sure there
|
||||
# aren't any future activations for the taken over names with yet higher amounts, if there are
|
||||
|
@ -973,7 +1119,7 @@ class BlockProcessor:
|
|||
# upon the delayed activation of B, we need to detect to activate C and make it take over early instead
|
||||
|
||||
claim_exists = {}
|
||||
for activated, activated_claim_txo in self.db.get_future_activated(height):
|
||||
for activated, activated_claim_txo in self.db.get_future_activated(height).items():
|
||||
# uses the pending effective amount for the future activation height, not the current height
|
||||
future_amount = self._get_pending_claim_amount(
|
||||
activated.normalized_name, activated.claim_hash, activated_claim_txo.height + 1
|
||||
|
@ -1060,32 +1206,32 @@ class BlockProcessor:
|
|||
break
|
||||
assert None not in (amount, activation)
|
||||
# update the claim that's activating early
|
||||
self.db_op_stack.extend_ops(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
|
||||
position, activation, name, amount
|
||||
).get_remove_activate_ops() + \
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
|
||||
position, height, name, amount
|
||||
).get_activate_ops()
|
||||
self.get_remove_activate_ops(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
|
||||
position, activation, name, amount
|
||||
)
|
||||
self.get_activate_ops(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
|
||||
position, height, name, amount
|
||||
)
|
||||
|
||||
for (k, amount) in activate_in_future[name][winning_including_future_activations]:
|
||||
txo = (k.tx_num, k.position)
|
||||
if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]:
|
||||
self.db_op_stack.extend_ops(
|
||||
StagedActivation(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
|
||||
k.position, k.height, name, amount
|
||||
).get_remove_activate_ops() + \
|
||||
StagedActivation(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
|
||||
k.position, height, name, amount
|
||||
).get_activate_ops()
|
||||
self.get_remove_activate_ops(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
|
||||
k.position, k.height, name, amount
|
||||
)
|
||||
self.get_activate_ops(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
|
||||
k.position, height, name, amount
|
||||
)
|
||||
self.taken_over_names.add(name)
|
||||
self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
|
||||
if controlling:
|
||||
self.db.prefix_db.claim_takeover.stage_delete(
|
||||
(name,), (controlling.claim_hash, controlling.height)
|
||||
)
|
||||
self.db.prefix_db.claim_takeover.stage_put((name,), (winning_including_future_activations, height))
|
||||
self.touched_claim_hashes.add(winning_including_future_activations)
|
||||
if controlling and controlling.claim_hash not in self.abandoned_claims:
|
||||
self.touched_claim_hashes.add(controlling.claim_hash)
|
||||
|
@ -1106,20 +1252,20 @@ class BlockProcessor:
|
|||
if previous_pending_activate.height > height:
|
||||
# the claim had a pending activation in the future, move it to now
|
||||
if tx_num < self.tx_count:
|
||||
self.db_op_stack.extend_ops(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
|
||||
position, previous_pending_activate.height, name, amount
|
||||
).get_remove_activate_ops()
|
||||
)
|
||||
self.db_op_stack.extend_ops(
|
||||
StagedActivation(
|
||||
self.get_remove_activate_ops(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
|
||||
position, height, name, amount
|
||||
).get_activate_ops()
|
||||
position, previous_pending_activate.height, name, amount
|
||||
)
|
||||
self.get_activate_ops(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
|
||||
position, height, name, amount
|
||||
)
|
||||
self.taken_over_names.add(name)
|
||||
self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
|
||||
if controlling:
|
||||
self.db.prefix_db.claim_takeover.stage_delete(
|
||||
(name,), (controlling.claim_hash, controlling.height)
|
||||
)
|
||||
self.db.prefix_db.claim_takeover.stage_put((name,), (winning_claim_hash, height))
|
||||
if controlling and controlling.claim_hash not in self.abandoned_claims:
|
||||
self.touched_claim_hashes.add(controlling.claim_hash)
|
||||
self.touched_claim_hashes.add(winning_claim_hash)
|
||||
|
@ -1147,7 +1293,11 @@ class BlockProcessor:
|
|||
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
|
||||
self.taken_over_names.add(name)
|
||||
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
|
||||
self.db_op_stack.extend_ops(get_takeover_name_ops(name, winning, height, controlling))
|
||||
if controlling:
|
||||
self.db.prefix_db.claim_takeover.stage_delete(
|
||||
(name,), (controlling.claim_hash, controlling.height)
|
||||
)
|
||||
self.db.prefix_db.claim_takeover.stage_put((name,), (winning, height))
|
||||
if controlling:
|
||||
self.touched_claim_hashes.add(controlling.claim_hash)
|
||||
self.touched_claim_hashes.add(winning)
|
||||
|
@ -1185,6 +1335,15 @@ class BlockProcessor:
|
|||
)
|
||||
)
|
||||
|
||||
# update support amount totals
|
||||
for supported_claim, amount in self.pending_support_amount_change.items():
|
||||
existing = self.db.prefix_db.support_amount.get(supported_claim)
|
||||
total = amount
|
||||
if existing is not None:
|
||||
total += existing.amount
|
||||
self.db.prefix_db.support_amount.stage_delete((supported_claim,), existing)
|
||||
self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,))
|
||||
|
||||
# use the cumulative changes to update bid ordered resolve
|
||||
for removed in self.removed_claim_hashes:
|
||||
removed_claim = self.db.get_claim_txo(removed)
|
||||
|
@ -1193,10 +1352,9 @@ class BlockProcessor:
|
|||
removed_claim.normalized_name, removed
|
||||
)
|
||||
if amt:
|
||||
self.db_op_stack.extend_ops(get_remove_effective_amount_ops(
|
||||
removed_claim.normalized_name, amt.effective_amount, amt.tx_num,
|
||||
amt.position, removed
|
||||
))
|
||||
self.db.prefix_db.effective_amount.stage_delete(
|
||||
(removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,)
|
||||
)
|
||||
for touched in self.touched_claim_hashes:
|
||||
prev_effective_amount = 0
|
||||
|
||||
|
@ -1208,10 +1366,10 @@ class BlockProcessor:
|
|||
claim_amount_info = self.db.get_url_effective_amount(name, touched)
|
||||
if claim_amount_info:
|
||||
prev_effective_amount = claim_amount_info.effective_amount
|
||||
self.db_op_stack.extend_ops(get_remove_effective_amount_ops(
|
||||
name, claim_amount_info.effective_amount, claim_amount_info.tx_num,
|
||||
claim_amount_info.position, touched
|
||||
))
|
||||
self.db.prefix_db.effective_amount.stage_delete(
|
||||
(name, claim_amount_info.effective_amount, claim_amount_info.tx_num,
|
||||
claim_amount_info.position), (touched,)
|
||||
)
|
||||
else:
|
||||
v = self.db.get_claim_txo(touched)
|
||||
if not v:
|
||||
|
@ -1220,10 +1378,8 @@ class BlockProcessor:
|
|||
amt = self.db.get_url_effective_amount(name, touched)
|
||||
if amt:
|
||||
prev_effective_amount = amt.effective_amount
|
||||
self.db_op_stack.extend_ops(
|
||||
get_remove_effective_amount_ops(
|
||||
name, amt.effective_amount, amt.tx_num, amt.position, touched
|
||||
)
|
||||
self.db.prefix_db.effective_amount.stage_delete(
|
||||
(name, prev_effective_amount, amt.tx_num, amt.position), (touched,)
|
||||
)
|
||||
|
||||
if (name, touched) in self.activated_claim_amount_by_name_and_hash:
|
||||
|
@ -1242,12 +1398,18 @@ class BlockProcessor:
|
|||
touched.hex(), height, False, prev_effective_amount, support_amount
|
||||
)
|
||||
new_effective_amount = self._get_pending_effective_amount(name, touched)
|
||||
self.db_op_stack.extend_ops(
|
||||
get_add_effective_amount_ops(
|
||||
name, new_effective_amount, tx_num, position, touched
|
||||
)
|
||||
self.db.prefix_db.effective_amount.stage_put(
|
||||
(name, new_effective_amount, tx_num, position), (touched,)
|
||||
)
|
||||
|
||||
for channel_hash, count in self.pending_channel_counts.items():
|
||||
if count != 0:
|
||||
channel_count_val = self.db.prefix_db.channel_count.get(channel_hash)
|
||||
channel_count = 0 if not channel_count_val else channel_count_val.count
|
||||
if channel_count_val is not None:
|
||||
self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,))
|
||||
self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,))
|
||||
|
||||
self.touched_claim_hashes.update(
|
||||
{k for k in self.pending_reposted if k not in self.removed_claim_hashes}
|
||||
)
|
||||
|
@ -1319,9 +1481,8 @@ class BlockProcessor:
|
|||
for abandoned_claim_hash, (tx_num, nout, normalized_name) in abandoned_channels.items():
|
||||
# print(f"\tabandon {normalized_name} {abandoned_claim_hash.hex()} {tx_num} {nout}")
|
||||
self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name)
|
||||
|
||||
self.db.total_transactions.append(tx_hash)
|
||||
self.db.transaction_num_mapping[tx_hash] = tx_count
|
||||
self.pending_transactions[tx_count] = tx_hash
|
||||
self.pending_transaction_num_mapping[tx_hash] = tx_count
|
||||
tx_count += 1
|
||||
|
||||
# handle expired claims
|
||||
|
@ -1333,43 +1494,53 @@ class BlockProcessor:
|
|||
# update effective amount and update sets of touched and deleted claims
|
||||
self._get_cumulative_update_ops(height)
|
||||
|
||||
self.db_op_stack.append_op(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
|
||||
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
|
||||
|
||||
for hashX, new_history in self.hashXs_by_tx.items():
|
||||
if not new_history:
|
||||
continue
|
||||
self.db_op_stack.append_op(
|
||||
RevertablePut(
|
||||
*Prefixes.hashX_history.pack_item(
|
||||
hashX, height, new_history
|
||||
)
|
||||
)
|
||||
)
|
||||
self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,))
|
||||
|
||||
self.tx_count = tx_count
|
||||
self.db.tx_counts.append(self.tx_count)
|
||||
|
||||
cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit
|
||||
if height >= cached_max_reorg_depth:
|
||||
self.db_op_stack.append_op(
|
||||
RevertablePut(
|
||||
*Prefixes.touched_or_deleted.pack_item(
|
||||
height, self.touched_claim_hashes, self.removed_claim_hashes
|
||||
)
|
||||
)
|
||||
)
|
||||
self.db_op_stack.append_op(
|
||||
RevertablePut(
|
||||
*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())
|
||||
)
|
||||
self.db.prefix_db.touched_or_deleted.stage_put(
|
||||
key_args=(height,), value_args=(self.touched_claim_hashes, self.removed_claim_hashes)
|
||||
)
|
||||
|
||||
self.height = height
|
||||
self.db.headers.append(block.header)
|
||||
self.tip = self.coin.header_hash(block.header)
|
||||
|
||||
min_height = self.db.min_undo_height(self.db.db_height)
|
||||
if min_height > 0: # delete undos for blocks deep enough they can't be reorged
|
||||
undo_to_delete = list(self.db.prefix_db.undo.iterate(start=(0,), stop=(min_height,)))
|
||||
for (k, v) in undo_to_delete:
|
||||
self.db.prefix_db.undo.stage_delete((k,), (v,))
|
||||
touched_or_deleted_to_delete = list(self.db.prefix_db.touched_or_deleted.iterate(
|
||||
start=(0,), stop=(min_height,))
|
||||
)
|
||||
for (k, v) in touched_or_deleted_to_delete:
|
||||
self.db.prefix_db.touched_or_deleted.stage_delete(k, v)
|
||||
|
||||
self.db.fs_height = self.height
|
||||
self.db.fs_tx_count = self.tx_count
|
||||
self.db.hist_flush_count += 1
|
||||
self.db.hist_unflushed_count = 0
|
||||
self.db.utxo_flush_count = self.db.hist_flush_count
|
||||
self.db.db_height = self.height
|
||||
self.db.db_tx_count = self.tx_count
|
||||
self.db.db_tip = self.tip
|
||||
self.db.last_flush_tx_count = self.db.fs_tx_count
|
||||
now = time.time()
|
||||
self.db.wall_time += now - self.db.last_flush
|
||||
self.db.last_flush = now
|
||||
|
||||
self.db.write_db_state()
|
||||
|
||||
def clear_after_advance_or_reorg(self):
|
||||
self.db_op_stack.clear()
|
||||
self.txo_to_claim.clear()
|
||||
self.claim_hash_to_txo.clear()
|
||||
self.support_txos_by_claim.clear()
|
||||
|
@ -1399,59 +1570,87 @@ class BlockProcessor:
|
|||
self.pending_channel_counts.clear()
|
||||
self.updated_claims.clear()
|
||||
self.taken_over_names.clear()
|
||||
self.pending_transaction_num_mapping.clear()
|
||||
self.pending_transactions.clear()
|
||||
self.pending_support_amount_change.clear()
|
||||
|
||||
async def backup_block(self):
|
||||
# self.db.assert_flushed(self.flush_data())
|
||||
self.logger.info("backup block %i", self.height)
|
||||
# Check and update self.tip
|
||||
undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height)
|
||||
if undo_ops is None:
|
||||
raise ChainError(f'no undo information found for height {self.height:,d}')
|
||||
self.db_op_stack.append_op(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops))
|
||||
self.db_op_stack.apply_packed_undo_ops(undo_ops)
|
||||
|
||||
touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes)
|
||||
assert len(self.db.prefix_db._op_stack) == 0
|
||||
touched_and_deleted = self.db.prefix_db.touched_or_deleted.get(self.height)
|
||||
self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims)
|
||||
self.removed_claims_to_send_es.difference_update(touched_and_deleted.touched_claims)
|
||||
self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims)
|
||||
|
||||
# self.db.assert_flushed(self.flush_data())
|
||||
self.logger.info("backup block %i", self.height)
|
||||
# Check and update self.tip
|
||||
|
||||
self.db.headers.pop()
|
||||
self.db.tx_counts.pop()
|
||||
self.tip = self.coin.header_hash(self.db.headers[-1])
|
||||
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
|
||||
self.db.transaction_num_mapping.pop(self.db.total_transactions.pop())
|
||||
self.tx_count -= 1
|
||||
self.tx_count = self.db.tx_counts[-1]
|
||||
self.height -= 1
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched_hashXs.discard(None)
|
||||
|
||||
assert self.height < self.db.db_height
|
||||
assert not self.db.hist_unflushed
|
||||
|
||||
start_time = time.time()
|
||||
tx_delta = self.tx_count - self.db.last_flush_tx_count
|
||||
###
|
||||
self.db.fs_tx_count = self.tx_count
|
||||
# Truncate header_mc: header count is 1 more than the height.
|
||||
self.db.header_mc.truncate(self.height + 1)
|
||||
###
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.db.hist_flush_count += 1
|
||||
|
||||
while self.db.fs_height > self.height:
|
||||
self.db.fs_height -= 1
|
||||
self.db.utxo_flush_count = self.db.hist_flush_count
|
||||
self.db.db_height = self.height
|
||||
self.db.db_tx_count = self.tx_count
|
||||
self.db.db_tip = self.tip
|
||||
# Flush state last as it reads the wall time.
|
||||
now = time.time()
|
||||
self.db.wall_time += now - self.db.last_flush
|
||||
self.db.last_flush = now
|
||||
self.db.last_flush_tx_count = self.db.fs_tx_count
|
||||
|
||||
await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1)
|
||||
self.clear_after_advance_or_reorg()
|
||||
|
||||
elapsed = self.db.last_flush - start_time
|
||||
self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
|
||||
f'Height {self.height:,d} txs: {self.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
|
||||
hashX = self.coin.hashX_from_script(txout.pk_script)
|
||||
if hashX:
|
||||
self.touched_hashXs.add(hashX)
|
||||
self.utxo_cache[(tx_hash, nout)] = (hashX, txout.value)
|
||||
self.db_op_stack.extend_ops([
|
||||
RevertablePut(
|
||||
*Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value)
|
||||
),
|
||||
RevertablePut(
|
||||
*Prefixes.hashX_utxo.pack_item(tx_hash[:4], tx_num, nout, hashX)
|
||||
)
|
||||
])
|
||||
self.db.prefix_db.utxo.stage_put((hashX, tx_num, nout), (txout.value,))
|
||||
self.db.prefix_db.hashX_utxo.stage_put((tx_hash[:4], tx_num, nout), (hashX,))
|
||||
return hashX
|
||||
|
||||
def get_pending_tx_num(self, tx_hash: bytes) -> int:
|
||||
if tx_hash in self.pending_transaction_num_mapping:
|
||||
return self.pending_transaction_num_mapping[tx_hash]
|
||||
else:
|
||||
return self.db.prefix_db.tx_num.get(tx_hash).tx_num
|
||||
|
||||
def spend_utxo(self, tx_hash: bytes, nout: int):
|
||||
hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None))
|
||||
txin_num = self.db.transaction_num_mapping[tx_hash]
|
||||
hdb_key = Prefixes.hashX_utxo.pack_key(tx_hash[:4], txin_num, nout)
|
||||
txin_num = self.get_pending_tx_num(tx_hash)
|
||||
if not hashX:
|
||||
hashX = self.db.db.get(hdb_key)
|
||||
if not hashX:
|
||||
hashX_value = self.db.prefix_db.hashX_utxo.get(tx_hash[:4], txin_num, nout)
|
||||
if not hashX_value:
|
||||
return
|
||||
udb_key = Prefixes.utxo.pack_key(hashX, txin_num, nout)
|
||||
utxo_value_packed = self.db.db.get(udb_key)
|
||||
if utxo_value_packed is None:
|
||||
hashX = hashX_value.hashX
|
||||
utxo_value = self.db.prefix_db.utxo.get(hashX, txin_num, nout)
|
||||
if not utxo_value:
|
||||
self.logger.warning(
|
||||
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX)
|
||||
)
|
||||
|
@ -1459,18 +1658,13 @@ class BlockProcessor:
|
|||
f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}"
|
||||
)
|
||||
self.touched_hashXs.add(hashX)
|
||||
self.db_op_stack.extend_ops([
|
||||
RevertableDelete(hdb_key, hashX),
|
||||
RevertableDelete(udb_key, utxo_value_packed)
|
||||
])
|
||||
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), hashX_value)
|
||||
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), utxo_value)
|
||||
return hashX
|
||||
elif amount is not None:
|
||||
udb_key = Prefixes.utxo.pack_key(hashX, txin_num, nout)
|
||||
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), (hashX,))
|
||||
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), (amount,))
|
||||
self.touched_hashXs.add(hashX)
|
||||
self.db_op_stack.extend_ops([
|
||||
RevertableDelete(hdb_key, hashX),
|
||||
RevertableDelete(udb_key, Prefixes.utxo.pack_value(amount))
|
||||
])
|
||||
return hashX
|
||||
|
||||
async def _process_prefetched_blocks(self):
|
||||
|
@ -1494,7 +1688,15 @@ class BlockProcessor:
|
|||
# Flush everything but with first_sync->False state.
|
||||
first_sync = self.db.first_sync
|
||||
self.db.first_sync = False
|
||||
await self.write_state()
|
||||
|
||||
def flush():
|
||||
assert len(self.db.prefix_db._op_stack) == 0
|
||||
self.db.write_db_state()
|
||||
self.db.prefix_db.unsafe_commit()
|
||||
self.db.assert_db_state()
|
||||
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
if first_sync:
|
||||
self.logger.info(f'{lbry.__version__} synced to '
|
||||
f'height {self.height:,d}, halting here.')
|
||||
|
@ -1516,7 +1718,6 @@ class BlockProcessor:
|
|||
self._caught_up_event = caught_up_event
|
||||
try:
|
||||
await self.db.open_dbs()
|
||||
self.db_op_stack = self.db.db_op_stack
|
||||
self.height = self.db.db_height
|
||||
self.tip = self.db.db_tip
|
||||
self.tx_count = self.db.db_tx_count
|
||||
|
|
|
@ -37,4 +37,5 @@ class DB_PREFIXES(enum.Enum):
|
|||
hashx_utxo = b'h'
|
||||
hashx_history = b'x'
|
||||
db_state = b's'
|
||||
trending_spike = b't'
|
||||
channel_count = b'Z'
|
||||
support_amount = b'a'
|
||||
|
|
|
@ -1,258 +0,0 @@
|
|||
import typing
|
||||
from typing import Optional
|
||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp
|
||||
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
|
||||
from lbry.wallet.server.db.prefixes import RepostPrefixRow, RepostedPrefixRow
|
||||
|
||||
|
||||
def length_encoded_name(name: str) -> bytes:
|
||||
encoded = name.encode('utf-8')
|
||||
return len(encoded).to_bytes(2, byteorder='big') + encoded
|
||||
|
||||
|
||||
class StagedClaimtrieSupport(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
tx_num: int
|
||||
position: int
|
||||
amount: int
|
||||
|
||||
def _get_add_remove_support_utxo_ops(self, add=True):
|
||||
"""
|
||||
get a list of revertable operations to add or spend a support txo to the key: value database
|
||||
|
||||
:param add: if true use RevertablePut operations, otherwise use RevertableDelete
|
||||
:return:
|
||||
"""
|
||||
op = RevertablePut if add else RevertableDelete
|
||||
return [
|
||||
op(
|
||||
*Prefixes.claim_to_support.pack_item(self.claim_hash, self.tx_num, self.position, self.amount)
|
||||
),
|
||||
op(
|
||||
*Prefixes.support_to_claim.pack_item(self.tx_num, self.position, self.claim_hash)
|
||||
)
|
||||
]
|
||||
|
||||
def get_add_support_utxo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_support_utxo_ops(add=True)
|
||||
|
||||
def get_spend_support_txo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_support_utxo_ops(add=False)
|
||||
|
||||
|
||||
class StagedActivation(typing.NamedTuple):
|
||||
txo_type: int
|
||||
claim_hash: bytes
|
||||
tx_num: int
|
||||
position: int
|
||||
activation_height: int
|
||||
name: str
|
||||
amount: int
|
||||
|
||||
def _get_add_remove_activate_ops(self, add=True):
|
||||
op = RevertablePut if add else RevertableDelete
|
||||
# print(f"\t{'add' if add else 'remove'} {'claim' if self.txo_type == ACTIVATED_CLAIM_TXO_TYPE else 'support'},"
|
||||
# f" {self.tx_num}, {self.position}, activation={self.activation_height}, {self.name}, "
|
||||
# f"amount={self.amount}")
|
||||
return [
|
||||
op(
|
||||
*Prefixes.activated.pack_item(
|
||||
self.txo_type, self.tx_num, self.position, self.activation_height, self.claim_hash, self.name
|
||||
)
|
||||
),
|
||||
op(
|
||||
*Prefixes.pending_activation.pack_item(
|
||||
self.activation_height, self.txo_type, self.tx_num, self.position,
|
||||
self.claim_hash, self.name
|
||||
)
|
||||
),
|
||||
op(
|
||||
*Prefixes.active_amount.pack_item(
|
||||
self.claim_hash, self.txo_type, self.activation_height, self.tx_num, self.position, self.amount
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
def get_activate_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_activate_ops(add=True)
|
||||
|
||||
def get_remove_activate_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_activate_ops(add=False)
|
||||
|
||||
|
||||
def get_remove_name_ops(name: str, claim_hash: bytes, height: int) -> typing.List[RevertableDelete]:
|
||||
return [
|
||||
RevertableDelete(
|
||||
*Prefixes.claim_takeover.pack_item(
|
||||
name, claim_hash, height
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int,
|
||||
previous_winning: Optional[ClaimTakeoverValue]):
|
||||
if previous_winning:
|
||||
return [
|
||||
RevertableDelete(
|
||||
*Prefixes.claim_takeover.pack_item(
|
||||
name, previous_winning.claim_hash, previous_winning.height
|
||||
)
|
||||
),
|
||||
RevertablePut(
|
||||
*Prefixes.claim_takeover.pack_item(
|
||||
name, claim_hash, takeover_height
|
||||
)
|
||||
)
|
||||
]
|
||||
return [
|
||||
RevertablePut(
|
||||
*Prefixes.claim_takeover.pack_item(
|
||||
name, claim_hash, takeover_height
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def get_remove_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes):
|
||||
return [
|
||||
RevertableDelete(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash))
|
||||
]
|
||||
|
||||
|
||||
def get_add_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes):
|
||||
return [
|
||||
RevertablePut(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash))
|
||||
]
|
||||
|
||||
|
||||
class StagedClaimtrieItem(typing.NamedTuple):
|
||||
name: str
|
||||
normalized_name: str
|
||||
claim_hash: bytes
|
||||
amount: int
|
||||
expiration_height: int
|
||||
tx_num: int
|
||||
position: int
|
||||
root_tx_num: int
|
||||
root_position: int
|
||||
channel_signature_is_valid: bool
|
||||
signing_hash: Optional[bytes]
|
||||
reposted_claim_hash: Optional[bytes]
|
||||
|
||||
@property
|
||||
def is_update(self) -> bool:
|
||||
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
|
||||
|
||||
def _get_add_remove_claim_utxo_ops(self, add=True):
|
||||
"""
|
||||
get a list of revertable operations to add or spend a claim txo to the key: value database
|
||||
|
||||
:param add: if true use RevertablePut operations, otherwise use RevertableDelete
|
||||
:return:
|
||||
"""
|
||||
op = RevertablePut if add else RevertableDelete
|
||||
ops = [
|
||||
# claim tip by claim hash
|
||||
op(
|
||||
*Prefixes.claim_to_txo.pack_item(
|
||||
self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position,
|
||||
self.amount, self.channel_signature_is_valid, self.name
|
||||
)
|
||||
),
|
||||
# claim hash by txo
|
||||
op(
|
||||
*Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.normalized_name)
|
||||
),
|
||||
# claim expiration
|
||||
op(
|
||||
*Prefixes.claim_expiration.pack_item(
|
||||
self.expiration_height, self.tx_num, self.position, self.claim_hash,
|
||||
self.normalized_name
|
||||
)
|
||||
),
|
||||
# short url resolution
|
||||
]
|
||||
ops.extend([
|
||||
op(
|
||||
*Prefixes.claim_short_id.pack_item(
|
||||
self.normalized_name, self.claim_hash.hex()[:prefix_len + 1], self.root_tx_num, self.root_position,
|
||||
self.tx_num, self.position
|
||||
)
|
||||
) for prefix_len in range(10)
|
||||
])
|
||||
|
||||
if self.signing_hash and self.channel_signature_is_valid:
|
||||
ops.extend([
|
||||
# channel by stream
|
||||
op(
|
||||
*Prefixes.claim_to_channel.pack_item(
|
||||
self.claim_hash, self.tx_num, self.position, self.signing_hash
|
||||
)
|
||||
),
|
||||
# stream by channel
|
||||
op(
|
||||
*Prefixes.channel_to_claim.pack_item(
|
||||
self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash
|
||||
)
|
||||
)
|
||||
])
|
||||
if self.reposted_claim_hash:
|
||||
ops.extend([
|
||||
op(
|
||||
*Prefixes.repost.pack_item(self.claim_hash, self.reposted_claim_hash)
|
||||
),
|
||||
op(
|
||||
*Prefixes.reposted_claim.pack_item(
|
||||
self.reposted_claim_hash, self.tx_num, self.position, self.claim_hash
|
||||
)
|
||||
),
|
||||
|
||||
])
|
||||
return ops
|
||||
|
||||
def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_claim_utxo_ops(add=True)
|
||||
|
||||
def get_spend_claim_txo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_claim_utxo_ops(add=False)
|
||||
|
||||
def get_invalidate_signature_ops(self):
|
||||
if not self.signing_hash:
|
||||
return []
|
||||
ops = [
|
||||
RevertableDelete(
|
||||
*Prefixes.claim_to_channel.pack_item(
|
||||
self.claim_hash, self.tx_num, self.position, self.signing_hash
|
||||
)
|
||||
)
|
||||
]
|
||||
if self.channel_signature_is_valid:
|
||||
ops.extend([
|
||||
# delete channel_to_claim/claim_to_channel
|
||||
RevertableDelete(
|
||||
*Prefixes.channel_to_claim.pack_item(
|
||||
self.signing_hash, self.normalized_name, self.tx_num, self.position, self.claim_hash
|
||||
)
|
||||
),
|
||||
# update claim_to_txo with channel_signature_is_valid=False
|
||||
RevertableDelete(
|
||||
*Prefixes.claim_to_txo.pack_item(
|
||||
self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position,
|
||||
self.amount, self.channel_signature_is_valid, self.name
|
||||
)
|
||||
),
|
||||
RevertablePut(
|
||||
*Prefixes.claim_to_txo.pack_item(
|
||||
self.claim_hash, self.tx_num, self.position, self.root_tx_num, self.root_position,
|
||||
self.amount, False, self.name
|
||||
)
|
||||
)
|
||||
])
|
||||
return ops
|
||||
|
||||
def invalidate_signature(self) -> 'StagedClaimtrieItem':
|
||||
return StagedClaimtrieItem(
|
||||
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
|
||||
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
|
||||
)
|
103
lbry/wallet/server/db/db.py
Normal file
103
lbry/wallet/server/db/db.py
Normal file
|
@ -0,0 +1,103 @@
|
|||
import struct
|
||||
from typing import Optional
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack
|
||||
|
||||
|
||||
class KeyValueStorage:
|
||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||
raise NotImplemented()
|
||||
|
||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||
include_key=True, include_value=True, fill_cache=True):
|
||||
raise NotImplemented()
|
||||
|
||||
def write_batch(self, transaction: bool = False):
|
||||
raise NotImplemented()
|
||||
|
||||
def close(self):
|
||||
raise NotImplemented()
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
raise NotImplemented()
|
||||
|
||||
|
||||
class PrefixDB:
|
||||
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
||||
|
||||
def __init__(self, db: KeyValueStorage, unsafe_prefixes=None):
|
||||
self._db = db
|
||||
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
||||
|
||||
def unsafe_commit(self):
|
||||
"""
|
||||
Write staged changes to the database without keeping undo information
|
||||
Changes written cannot be undone
|
||||
"""
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
if staged_change.is_put:
|
||||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
finally:
|
||||
self._op_stack.clear()
|
||||
|
||||
def commit(self, height: int):
|
||||
"""
|
||||
Write changes for a block height to the database and keep undo information so that the changes can be reverted
|
||||
"""
|
||||
undo_ops = self._op_stack.get_undo_ops()
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
if staged_change.is_put:
|
||||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
batch_put(DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height), undo_ops)
|
||||
finally:
|
||||
self._op_stack.clear()
|
||||
|
||||
def rollback(self, height: int):
|
||||
"""
|
||||
Revert changes for a block height
|
||||
"""
|
||||
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
|
||||
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
if staged_change.is_put:
|
||||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
batch_delete(undo_key)
|
||||
finally:
|
||||
self._op_stack.clear()
|
||||
|
||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||
return self._db.get(key, fill_cache=fill_cache)
|
||||
|
||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||
include_key=True, include_value=True, fill_cache=True):
|
||||
return self._db.iterator(
|
||||
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
||||
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
|
||||
)
|
||||
|
||||
def close(self):
|
||||
if not self._db.closed:
|
||||
self._db.close()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._db.closed
|
|
@ -2,9 +2,9 @@ import typing
|
|||
import struct
|
||||
import array
|
||||
import base64
|
||||
import plyvel
|
||||
from typing import Union, Tuple, NamedTuple, Optional
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
||||
from lbry.schema.url import normalize_name
|
||||
|
||||
|
@ -38,13 +38,13 @@ class PrefixRow(metaclass=PrefixRowType):
|
|||
value_struct: struct.Struct
|
||||
key_part_lambdas = []
|
||||
|
||||
def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack):
|
||||
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
|
||||
self._db = db
|
||||
self._op_stack = op_stack
|
||||
|
||||
def iterate(self, prefix=None, start=None, stop=None,
|
||||
reverse: bool = False, include_key: bool = True, include_value: bool = True,
|
||||
fill_cache: bool = True):
|
||||
fill_cache: bool = True, deserialize_key: bool = True, deserialize_value: bool = True):
|
||||
if not prefix and not start and not stop:
|
||||
prefix = ()
|
||||
if prefix is not None:
|
||||
|
@ -54,25 +54,36 @@ class PrefixRow(metaclass=PrefixRowType):
|
|||
if stop is not None:
|
||||
stop = self.pack_partial_key(*stop)
|
||||
|
||||
if deserialize_key:
|
||||
key_getter = lambda k: self.unpack_key(k)
|
||||
else:
|
||||
key_getter = lambda k: k
|
||||
if deserialize_value:
|
||||
value_getter = lambda v: self.unpack_value(v)
|
||||
else:
|
||||
value_getter = lambda v: v
|
||||
|
||||
if include_key and include_value:
|
||||
for k, v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse,
|
||||
fill_cache=fill_cache):
|
||||
yield self.unpack_key(k), self.unpack_value(v)
|
||||
yield key_getter(k), value_getter(v)
|
||||
elif include_key:
|
||||
for k in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_value=False,
|
||||
fill_cache=fill_cache):
|
||||
yield self.unpack_key(k)
|
||||
yield key_getter(k)
|
||||
elif include_value:
|
||||
for v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False,
|
||||
fill_cache=fill_cache):
|
||||
yield self.unpack_value(v)
|
||||
yield value_getter(v)
|
||||
else:
|
||||
raise RuntimeError
|
||||
for _ in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False,
|
||||
include_value=False, fill_cache=fill_cache):
|
||||
yield None
|
||||
|
||||
def get(self, *key_args, fill_cache=True):
|
||||
def get(self, *key_args, fill_cache=True, deserialize_value=True):
|
||||
v = self._db.get(self.pack_key(*key_args), fill_cache=fill_cache)
|
||||
if v:
|
||||
return self.unpack_value(v)
|
||||
return v if not deserialize_value else self.unpack_value(v)
|
||||
|
||||
def stage_put(self, key_args=(), value_args=()):
|
||||
self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args)))
|
||||
|
@ -303,6 +314,28 @@ class ChannelToClaimValue(typing.NamedTuple):
|
|||
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
|
||||
|
||||
|
||||
class ChannelCountKey(typing.NamedTuple):
|
||||
channel_hash: bytes
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}(channel_hash={self.channel_hash.hex()})"
|
||||
|
||||
|
||||
class ChannelCountValue(typing.NamedTuple):
|
||||
count: int
|
||||
|
||||
|
||||
class SupportAmountKey(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
|
||||
|
||||
|
||||
class SupportAmountValue(typing.NamedTuple):
|
||||
amount: int
|
||||
|
||||
|
||||
class ClaimToSupportKey(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
tx_num: int
|
||||
|
@ -469,6 +502,20 @@ class TouchedOrDeletedClaimValue(typing.NamedTuple):
|
|||
f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})"
|
||||
|
||||
|
||||
class DBState(typing.NamedTuple):
|
||||
genesis: bytes
|
||||
height: int
|
||||
tx_count: int
|
||||
tip: bytes
|
||||
utxo_flush_count: int
|
||||
wall_time: int
|
||||
first_sync: bool
|
||||
db_version: int
|
||||
hist_flush_count: int
|
||||
comp_flush_count: int
|
||||
comp_cursor: int
|
||||
|
||||
|
||||
class ActiveAmountPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.active_amount.value
|
||||
key_struct = struct.Struct(b'>20sBLLH')
|
||||
|
@ -514,9 +561,7 @@ class ClaimToTXOPrefixRow(PrefixRow):
|
|||
|
||||
@classmethod
|
||||
def pack_key(cls, claim_hash: bytes):
|
||||
return super().pack_key(
|
||||
claim_hash
|
||||
)
|
||||
return super().pack_key(claim_hash)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> ClaimToTXOKey:
|
||||
|
@ -972,6 +1017,11 @@ class EffectiveAmountPrefixRow(PrefixRow):
|
|||
class RepostPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.repost.value
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>20s').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, claim_hash: bytes):
|
||||
return cls.prefix + claim_hash
|
||||
|
@ -1031,6 +1081,11 @@ class UndoPrefixRow(PrefixRow):
|
|||
prefix = DB_PREFIXES.undo.value
|
||||
key_struct = struct.Struct(b'>Q')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>Q').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, height: int):
|
||||
return super().pack_key(height)
|
||||
|
@ -1059,6 +1114,11 @@ class BlockHashPrefixRow(PrefixRow):
|
|||
key_struct = struct.Struct(b'>L')
|
||||
value_struct = struct.Struct(b'>32s')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>L').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, height: int) -> bytes:
|
||||
return super().pack_key(height)
|
||||
|
@ -1085,6 +1145,11 @@ class BlockHeaderPrefixRow(PrefixRow):
|
|||
key_struct = struct.Struct(b'>L')
|
||||
value_struct = struct.Struct(b'>112s')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>L').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, height: int) -> bytes:
|
||||
return super().pack_key(height)
|
||||
|
@ -1111,6 +1176,11 @@ class TXNumPrefixRow(PrefixRow):
|
|||
key_struct = struct.Struct(b'>32s')
|
||||
value_struct = struct.Struct(b'>L')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>32s').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, tx_hash: bytes) -> bytes:
|
||||
return super().pack_key(tx_hash)
|
||||
|
@ -1137,6 +1207,11 @@ class TxCountPrefixRow(PrefixRow):
|
|||
key_struct = struct.Struct(b'>L')
|
||||
value_struct = struct.Struct(b'>L')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>L').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, height: int) -> bytes:
|
||||
return super().pack_key(height)
|
||||
|
@ -1163,6 +1238,11 @@ class TXHashPrefixRow(PrefixRow):
|
|||
key_struct = struct.Struct(b'>L')
|
||||
value_struct = struct.Struct(b'>32s')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>L').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, tx_num: int) -> bytes:
|
||||
return super().pack_key(tx_num)
|
||||
|
@ -1188,6 +1268,11 @@ class TXPrefixRow(PrefixRow):
|
|||
prefix = DB_PREFIXES.tx.value
|
||||
key_struct = struct.Struct(b'>32s')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>32s').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, tx_hash: bytes) -> bytes:
|
||||
return super().pack_key(tx_hash)
|
||||
|
@ -1313,6 +1398,10 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
|
|||
prefix = DB_PREFIXES.claim_diff.value
|
||||
key_struct = struct.Struct(b'>L')
|
||||
value_struct = struct.Struct(b'>LL')
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>L').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, height: int):
|
||||
|
@ -1324,16 +1413,19 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
|
|||
|
||||
@classmethod
|
||||
def pack_value(cls, touched, deleted) -> bytes:
|
||||
assert True if not touched else all(len(item) == 20 for item in touched)
|
||||
assert True if not deleted else all(len(item) == 20 for item in deleted)
|
||||
return cls.value_struct.pack(len(touched), len(deleted)) + b''.join(touched) + b''.join(deleted)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> TouchedOrDeletedClaimValue:
|
||||
touched_len, deleted_len = cls.value_struct.unpack(data[:8])
|
||||
assert len(data) == 20 * (touched_len + deleted_len) + 8
|
||||
touched_bytes, deleted_bytes = data[8:touched_len*20+8], data[touched_len*20+8:touched_len*20+deleted_len*20+8]
|
||||
data = data[8:]
|
||||
assert len(data) == 20 * (touched_len + deleted_len)
|
||||
touched_bytes, deleted_bytes = data[:touched_len*20], data[touched_len*20:]
|
||||
return TouchedOrDeletedClaimValue(
|
||||
{touched_bytes[8+20*i:8+20*(i+1)] for i in range(touched_len)},
|
||||
{deleted_bytes[8+20*i:8+20*(i+1)] for i in range(deleted_len)}
|
||||
{touched_bytes[20*i:20*(i+1)] for i in range(touched_len)},
|
||||
{deleted_bytes[20*i:20*(i+1)] for i in range(deleted_len)}
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
@ -1341,87 +1433,170 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
|
|||
return cls.pack_key(height), cls.pack_value(touched, deleted)
|
||||
|
||||
|
||||
class Prefixes:
|
||||
claim_to_support = ClaimToSupportPrefixRow
|
||||
support_to_claim = SupportToClaimPrefixRow
|
||||
class ChannelCountPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.channel_count.value
|
||||
key_struct = struct.Struct(b'>20s')
|
||||
value_struct = struct.Struct(b'>L')
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>20s').pack
|
||||
]
|
||||
|
||||
claim_to_txo = ClaimToTXOPrefixRow
|
||||
txo_to_claim = TXOToClaimPrefixRow
|
||||
@classmethod
|
||||
def pack_key(cls, channel_hash: int):
|
||||
return super().pack_key(channel_hash)
|
||||
|
||||
claim_to_channel = ClaimToChannelPrefixRow
|
||||
channel_to_claim = ChannelToClaimPrefixRow
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> ChannelCountKey:
|
||||
return ChannelCountKey(*super().unpack_key(key))
|
||||
|
||||
claim_short_id = ClaimShortIDPrefixRow
|
||||
claim_expiration = ClaimExpirationPrefixRow
|
||||
@classmethod
|
||||
def pack_value(cls, count: int) -> bytes:
|
||||
return super().pack_value(count)
|
||||
|
||||
claim_takeover = ClaimTakeoverPrefixRow
|
||||
pending_activation = PendingActivationPrefixRow
|
||||
activated = ActivatedPrefixRow
|
||||
active_amount = ActiveAmountPrefixRow
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> ChannelCountValue:
|
||||
return ChannelCountValue(*super().unpack_value(data))
|
||||
|
||||
effective_amount = EffectiveAmountPrefixRow
|
||||
|
||||
repost = RepostPrefixRow
|
||||
reposted_claim = RepostedPrefixRow
|
||||
|
||||
undo = UndoPrefixRow
|
||||
utxo = UTXOPrefixRow
|
||||
hashX_utxo = HashXUTXOPrefixRow
|
||||
hashX_history = HashXHistoryPrefixRow
|
||||
block_hash = BlockHashPrefixRow
|
||||
tx_count = TxCountPrefixRow
|
||||
tx_hash = TXHashPrefixRow
|
||||
tx_num = TXNumPrefixRow
|
||||
tx = TXPrefixRow
|
||||
header = BlockHeaderPrefixRow
|
||||
touched_or_deleted = TouchedOrDeletedPrefixRow
|
||||
@classmethod
|
||||
def pack_item(cls, channel_hash, count):
|
||||
return cls.pack_key(channel_hash), cls.pack_value(count)
|
||||
|
||||
|
||||
class PrefixDB:
|
||||
def __init__(self, db: plyvel.DB, op_stack: RevertableOpStack):
|
||||
self._db = db
|
||||
self._op_stack = op_stack
|
||||
class SupportAmountPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.support_amount.value
|
||||
key_struct = struct.Struct(b'>20s')
|
||||
value_struct = struct.Struct(b'>Q')
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>20s').pack
|
||||
]
|
||||
|
||||
self.claim_to_support = ClaimToSupportPrefixRow(db, op_stack)
|
||||
self.support_to_claim = SupportToClaimPrefixRow(db, op_stack)
|
||||
self.claim_to_txo = ClaimToTXOPrefixRow(db, op_stack)
|
||||
self.txo_to_claim = TXOToClaimPrefixRow(db, op_stack)
|
||||
self.claim_to_channel = ClaimToChannelPrefixRow(db, op_stack)
|
||||
self.channel_to_claim = ChannelToClaimPrefixRow(db, op_stack)
|
||||
self.claim_short_id = ClaimShortIDPrefixRow(db, op_stack)
|
||||
self.claim_expiration = ClaimExpirationPrefixRow(db, op_stack)
|
||||
self.claim_takeover = ClaimTakeoverPrefixRow(db, op_stack)
|
||||
self.pending_activation = PendingActivationPrefixRow(db, op_stack)
|
||||
self.activated = ActivatedPrefixRow(db, op_stack)
|
||||
self.active_amount = ActiveAmountPrefixRow(db, op_stack)
|
||||
self.effective_amount = EffectiveAmountPrefixRow(db, op_stack)
|
||||
self.repost = RepostPrefixRow(db, op_stack)
|
||||
self.reposted_claim = RepostedPrefixRow(db, op_stack)
|
||||
self.undo = UndoPrefixRow(db, op_stack)
|
||||
self.utxo = UTXOPrefixRow(db, op_stack)
|
||||
self.hashX_utxo = HashXUTXOPrefixRow(db, op_stack)
|
||||
self.hashX_history = HashXHistoryPrefixRow(db, op_stack)
|
||||
self.block_hash = BlockHashPrefixRow(db, op_stack)
|
||||
self.tx_count = TxCountPrefixRow(db, op_stack)
|
||||
self.tx_hash = TXHashPrefixRow(db, op_stack)
|
||||
self.tx_num = TXNumPrefixRow(db, op_stack)
|
||||
self.tx = TXPrefixRow(db, op_stack)
|
||||
self.header = BlockHeaderPrefixRow(db, op_stack)
|
||||
self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack)
|
||||
@classmethod
|
||||
def pack_key(cls, claim_hash: bytes):
|
||||
return super().pack_key(claim_hash)
|
||||
|
||||
def commit(self):
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> SupportAmountKey:
|
||||
return SupportAmountKey(*super().unpack_key(key))
|
||||
|
||||
for staged_change in self._op_stack:
|
||||
if staged_change.is_put:
|
||||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
finally:
|
||||
self._op_stack.clear()
|
||||
@classmethod
|
||||
def pack_value(cls, amount: int) -> bytes:
|
||||
return super().pack_value(amount)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> SupportAmountValue:
|
||||
return SupportAmountValue(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, claim_hash, amount):
|
||||
return cls.pack_key(claim_hash), cls.pack_value(amount)
|
||||
|
||||
|
||||
class DBStatePrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.db_state.value
|
||||
value_struct = struct.Struct(b'>32sLL32sLLBBlll')
|
||||
key_struct = struct.Struct(b'')
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b''
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls) -> bytes:
|
||||
return cls.prefix
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes):
|
||||
return
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
|
||||
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
comp_cursor: int) -> bytes:
|
||||
return super().pack_value(
|
||||
genesis, height, tx_count, tip, utxo_flush_count,
|
||||
wall_time, 1 if first_sync else 0, db_version, hist_flush_count,
|
||||
comp_flush_count, comp_cursor
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> DBState:
|
||||
return DBState(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
|
||||
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
comp_cursor: int):
|
||||
return cls.pack_key(), cls.pack_value(
|
||||
genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync, db_version, hist_flush_count,
|
||||
comp_flush_count, comp_cursor
|
||||
)
|
||||
|
||||
|
||||
class LevelDBStore(KeyValueStorage):
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int):
|
||||
import plyvel
|
||||
self.db = plyvel.DB(
|
||||
path, create_if_missing=True, max_open_files=max_open_files,
|
||||
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
|
||||
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
|
||||
)
|
||||
|
||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||
return self.db.get(key, fill_cache=fill_cache)
|
||||
|
||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||
include_key=True, include_value=True, fill_cache=True):
|
||||
return self.db.iterator(
|
||||
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
||||
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
|
||||
)
|
||||
|
||||
def write_batch(self, transaction: bool = False, sync: bool = False):
|
||||
return self.db.write_batch(transaction=transaction, sync=sync)
|
||||
|
||||
def close(self):
|
||||
return self.db.close()
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
return self.db.closed
|
||||
|
||||
|
||||
class HubDB(PrefixDB):
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int = 512):
|
||||
db = LevelDBStore(path, cache_mb, max_open_files)
|
||||
super().__init__(db, unsafe_prefixes={DB_PREFIXES.db_state.value})
|
||||
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
||||
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
||||
self.claim_to_txo = ClaimToTXOPrefixRow(db, self._op_stack)
|
||||
self.txo_to_claim = TXOToClaimPrefixRow(db, self._op_stack)
|
||||
self.claim_to_channel = ClaimToChannelPrefixRow(db, self._op_stack)
|
||||
self.channel_to_claim = ChannelToClaimPrefixRow(db, self._op_stack)
|
||||
self.claim_short_id = ClaimShortIDPrefixRow(db, self._op_stack)
|
||||
self.claim_expiration = ClaimExpirationPrefixRow(db, self._op_stack)
|
||||
self.claim_takeover = ClaimTakeoverPrefixRow(db, self._op_stack)
|
||||
self.pending_activation = PendingActivationPrefixRow(db, self._op_stack)
|
||||
self.activated = ActivatedPrefixRow(db, self._op_stack)
|
||||
self.active_amount = ActiveAmountPrefixRow(db, self._op_stack)
|
||||
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
|
||||
self.repost = RepostPrefixRow(db, self._op_stack)
|
||||
self.reposted_claim = RepostedPrefixRow(db, self._op_stack)
|
||||
self.undo = UndoPrefixRow(db, self._op_stack)
|
||||
self.utxo = UTXOPrefixRow(db, self._op_stack)
|
||||
self.hashX_utxo = HashXUTXOPrefixRow(db, self._op_stack)
|
||||
self.hashX_history = HashXHistoryPrefixRow(db, self._op_stack)
|
||||
self.block_hash = BlockHashPrefixRow(db, self._op_stack)
|
||||
self.tx_count = TxCountPrefixRow(db, self._op_stack)
|
||||
self.tx_hash = TXHashPrefixRow(db, self._op_stack)
|
||||
self.tx_num = TXNumPrefixRow(db, self._op_stack)
|
||||
self.tx = TXPrefixRow(db, self._op_stack)
|
||||
self.header = BlockHeaderPrefixRow(db, self._op_stack)
|
||||
self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, self._op_stack)
|
||||
self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
|
||||
self.db_state = DBStatePrefixRow(db, self._op_stack)
|
||||
self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
|
||||
|
||||
|
||||
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
||||
|
|
|
@ -14,10 +14,8 @@ import array
|
|||
import time
|
||||
import typing
|
||||
import struct
|
||||
import attr
|
||||
import zlib
|
||||
import base64
|
||||
import plyvel
|
||||
from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List, TYPE_CHECKING
|
||||
from functools import partial
|
||||
from asyncio import sleep
|
||||
|
@ -32,10 +30,8 @@ from lbry.wallet.server import util
|
|||
from lbry.wallet.server.hash import hash_to_hex_str
|
||||
from lbry.wallet.server.tx import TxInput
|
||||
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack
|
||||
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
|
||||
from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB
|
||||
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
|
||||
from lbry.wallet.transaction import OutputScript
|
||||
|
@ -59,46 +55,8 @@ class UTXO(typing.NamedTuple):
|
|||
TXO_STRUCT = struct.Struct(b'>LH')
|
||||
TXO_STRUCT_unpack = TXO_STRUCT.unpack
|
||||
TXO_STRUCT_pack = TXO_STRUCT.pack
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class FlushData:
|
||||
height = attr.ib()
|
||||
tx_count = attr.ib()
|
||||
put_and_delete_ops = attr.ib()
|
||||
tip = attr.ib()
|
||||
|
||||
|
||||
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, ResolveCensoredError, LookupError, ValueError]]
|
||||
|
||||
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sLLBBlll')
|
||||
DB_STATE_STRUCT_SIZE = 94
|
||||
|
||||
|
||||
class DBState(typing.NamedTuple):
|
||||
genesis: bytes
|
||||
height: int
|
||||
tx_count: int
|
||||
tip: bytes
|
||||
utxo_flush_count: int
|
||||
wall_time: int
|
||||
first_sync: bool
|
||||
db_version: int
|
||||
hist_flush_count: int
|
||||
comp_flush_count: int
|
||||
comp_cursor: int
|
||||
|
||||
def pack(self) -> bytes:
|
||||
return DB_STATE_STRUCT.pack(
|
||||
self.genesis, self.height, self.tx_count, self.tip, self.utxo_flush_count,
|
||||
self.wall_time, 1 if self.first_sync else 0, self.db_version, self.hist_flush_count,
|
||||
self.comp_flush_count, self.comp_cursor
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack(cls, packed: bytes) -> 'DBState':
|
||||
return cls(*DB_STATE_STRUCT.unpack(packed[:DB_STATE_STRUCT_SIZE]))
|
||||
|
||||
|
||||
class DBError(Exception):
|
||||
"""Raised on general DB errors generally indicating corruption."""
|
||||
|
@ -114,7 +72,6 @@ class LevelDB:
|
|||
|
||||
self.logger.info(f'switching current directory to {env.db_dir}')
|
||||
|
||||
self.db = None
|
||||
self.prefix_db = None
|
||||
|
||||
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
|
||||
|
@ -149,8 +106,6 @@ class LevelDB:
|
|||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||
|
||||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||
self.total_transactions = None
|
||||
self.transaction_num_mapping = {}
|
||||
|
||||
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
||||
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
|
||||
|
@ -173,65 +128,57 @@ class LevelDB:
|
|||
self.ledger = RegTestLedger
|
||||
|
||||
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
||||
claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
|
||||
claim_hash_and_name = self.prefix_db.txo_to_claim.get(tx_num, tx_idx)
|
||||
if not claim_hash_and_name:
|
||||
return
|
||||
return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name)
|
||||
return claim_hash_and_name
|
||||
|
||||
def get_repost(self, claim_hash) -> Optional[bytes]:
|
||||
repost = self.db.get(Prefixes.repost.pack_key(claim_hash))
|
||||
repost = self.prefix_db.repost.get(claim_hash)
|
||||
if repost:
|
||||
return Prefixes.repost.unpack_value(repost).reposted_claim_hash
|
||||
return repost.reposted_claim_hash
|
||||
return
|
||||
|
||||
def get_reposted_count(self, claim_hash: bytes) -> int:
|
||||
cnt = 0
|
||||
for _ in self.db.iterator(prefix=Prefixes.reposted_claim.pack_partial_key(claim_hash)):
|
||||
cnt += 1
|
||||
return cnt
|
||||
return sum(
|
||||
1 for _ in self.prefix_db.reposted_claim.iterate(prefix=(claim_hash,), include_value=False)
|
||||
)
|
||||
|
||||
def get_activation(self, tx_num, position, is_support=False) -> int:
|
||||
activation = self.db.get(
|
||||
Prefixes.activated.pack_key(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position
|
||||
)
|
||||
activation = self.prefix_db.activated.get(
|
||||
ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position
|
||||
)
|
||||
if activation:
|
||||
return Prefixes.activated.unpack_value(activation).height
|
||||
return activation.height
|
||||
return -1
|
||||
|
||||
def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]:
|
||||
key = Prefixes.support_to_claim.pack_key(tx_num, position)
|
||||
supported_claim_hash = self.db.get(key)
|
||||
supported_claim_hash = self.prefix_db.support_to_claim.get(tx_num, position)
|
||||
if supported_claim_hash:
|
||||
packed_support_amount = self.db.get(
|
||||
Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, position)
|
||||
packed_support_amount = self.prefix_db.claim_to_support.get(
|
||||
supported_claim_hash.claim_hash, tx_num, position
|
||||
)
|
||||
if packed_support_amount:
|
||||
return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount
|
||||
return supported_claim_hash.claim_hash, packed_support_amount.amount
|
||||
return None, None
|
||||
|
||||
def get_support_amount(self, claim_hash: bytes):
|
||||
total = 0
|
||||
for packed in self.db.iterator(prefix=Prefixes.claim_to_support.pack_partial_key(claim_hash), include_key=False):
|
||||
total += Prefixes.claim_to_support.unpack_value(packed).amount
|
||||
return total
|
||||
support_amount_val = self.prefix_db.support_amount.get(claim_hash)
|
||||
if support_amount_val is None:
|
||||
return 0
|
||||
return support_amount_val.amount
|
||||
|
||||
def get_supports(self, claim_hash: bytes):
|
||||
supports = []
|
||||
for k, v in self.db.iterator(prefix=Prefixes.claim_to_support.pack_partial_key(claim_hash)):
|
||||
unpacked_k = Prefixes.claim_to_support.unpack_key(k)
|
||||
unpacked_v = Prefixes.claim_to_support.unpack_value(v)
|
||||
supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount))
|
||||
return supports
|
||||
return [
|
||||
(k.tx_num, k.position, v.amount) for k, v in self.prefix_db.claim_to_support.iterate(prefix=(claim_hash,))
|
||||
]
|
||||
|
||||
def get_short_claim_id_url(self, name: str, normalized_name: str, claim_hash: bytes,
|
||||
root_tx_num: int, root_position: int) -> str:
|
||||
claim_id = claim_hash.hex()
|
||||
for prefix_len in range(10):
|
||||
prefix = Prefixes.claim_short_id.pack_partial_key(normalized_name, claim_id[:prefix_len+1])
|
||||
for _k in self.db.iterator(prefix=prefix, include_value=False):
|
||||
k = Prefixes.claim_short_id.unpack_key(_k)
|
||||
for k in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:prefix_len+1]),
|
||||
include_value=False):
|
||||
if k.root_tx_num == root_tx_num and k.root_position == root_position:
|
||||
return f'{name}#{k.partial_claim_id}'
|
||||
break
|
||||
|
@ -247,7 +194,7 @@ class LevelDB:
|
|||
normalized_name = name
|
||||
controlling_claim = self.get_controlling_claim(normalized_name)
|
||||
|
||||
tx_hash = self.total_transactions[tx_num]
|
||||
tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
|
||||
height = bisect_right(self.tx_counts, tx_num)
|
||||
created_height = bisect_right(self.tx_counts, root_tx_num)
|
||||
last_take_over_height = controlling_claim.height
|
||||
|
@ -314,10 +261,7 @@ class LevelDB:
|
|||
self.get_activation(claim_txo.tx_num, claim_txo.position), claim_txo.channel_signature_is_valid
|
||||
)
|
||||
# resolve by partial/complete claim id
|
||||
prefix = Prefixes.claim_short_id.pack_partial_key(normalized_name, claim_id[:10])
|
||||
for k, v in self.db.iterator(prefix=prefix):
|
||||
key = Prefixes.claim_short_id.unpack_key(k)
|
||||
claim_txo = Prefixes.claim_short_id.unpack_value(v)
|
||||
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])):
|
||||
claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position]
|
||||
non_normalized_name = self.claim_to_txo.get(claim_hash).name
|
||||
signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid
|
||||
|
@ -329,12 +273,9 @@ class LevelDB:
|
|||
return
|
||||
|
||||
# resolve by amount ordering, 1 indexed
|
||||
prefix = Prefixes.effective_amount.pack_partial_key(normalized_name)
|
||||
for idx, (k, v) in enumerate(self.db.iterator(prefix=prefix)):
|
||||
for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))):
|
||||
if amount_order > idx + 1:
|
||||
continue
|
||||
key = Prefixes.effective_amount.unpack_key(k)
|
||||
claim_val = Prefixes.effective_amount.unpack_value(v)
|
||||
claim_txo = self.claim_to_txo.get(claim_val.claim_hash)
|
||||
activation = self.get_activation(key.tx_num, key.position)
|
||||
return self._prepare_resolve_result(
|
||||
|
@ -345,9 +286,7 @@ class LevelDB:
|
|||
|
||||
def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str):
|
||||
candidates = []
|
||||
for k, v in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash, normalized_name)):
|
||||
key = Prefixes.channel_to_claim.unpack_key(k)
|
||||
stream = Prefixes.channel_to_claim.unpack_value(v)
|
||||
for key, stream in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash, normalized_name)):
|
||||
effective_amount = self.get_effective_amount(stream.claim_hash)
|
||||
if not candidates or candidates[-1][-1] == effective_amount:
|
||||
candidates.append((stream.claim_hash, key.tx_num, key.position, effective_amount))
|
||||
|
@ -362,7 +301,7 @@ class LevelDB:
|
|||
try:
|
||||
parsed = URL.parse(url)
|
||||
except ValueError as e:
|
||||
return e, None
|
||||
return e, None, None
|
||||
|
||||
stream = channel = resolved_channel = resolved_stream = None
|
||||
if parsed.has_stream_in_channel:
|
||||
|
@ -428,9 +367,9 @@ class LevelDB:
|
|||
return claim.amount
|
||||
|
||||
def get_block_hash(self, height: int) -> Optional[bytes]:
|
||||
v = self.db.get(Prefixes.block_hash.pack_key(height))
|
||||
v = self.prefix_db.block_hash.get(height)
|
||||
if v:
|
||||
return Prefixes.block_hash.unpack_value(v).block_hash
|
||||
return v.block_hash
|
||||
|
||||
def get_support_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]:
|
||||
v = self.prefix_db.claim_to_support.get(claim_hash, tx_num, position)
|
||||
|
@ -467,19 +406,19 @@ class LevelDB:
|
|||
|
||||
def get_claims_for_name(self, name):
|
||||
claims = []
|
||||
prefix = Prefixes.claim_short_id.pack_partial_key(name) + bytes([1])
|
||||
for _k, _v in self.db.iterator(prefix=prefix):
|
||||
v = Prefixes.claim_short_id.unpack_value(_v)
|
||||
prefix = self.prefix_db.claim_short_id.pack_partial_key(name) + bytes([1])
|
||||
for _k, _v in self.prefix_db.iterator(prefix=prefix):
|
||||
v = self.prefix_db.claim_short_id.unpack_value(_v)
|
||||
claim_hash = self.get_claim_from_txo(v.tx_num, v.position).claim_hash
|
||||
if claim_hash not in claims:
|
||||
claims.append(claim_hash)
|
||||
return claims
|
||||
|
||||
def get_claims_in_channel_count(self, channel_hash) -> int:
|
||||
count = 0
|
||||
for _ in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash,), include_key=False):
|
||||
count += 1
|
||||
return count
|
||||
channel_count_val = self.prefix_db.channel_count.get(channel_hash)
|
||||
if channel_count_val is None:
|
||||
return 0
|
||||
return channel_count_val.count
|
||||
|
||||
async def reload_blocking_filtering_streams(self):
|
||||
def reload():
|
||||
|
@ -506,14 +445,15 @@ class LevelDB:
|
|||
return streams, channels
|
||||
|
||||
def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]:
|
||||
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position))
|
||||
v = self.prefix_db.claim_to_channel.get(claim_hash, tx_num, position)
|
||||
if v:
|
||||
return v.signing_hash
|
||||
|
||||
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
|
||||
expired = {}
|
||||
for _k, _v in self.db.iterator(prefix=Prefixes.claim_expiration.pack_partial_key(height)):
|
||||
k, v = Prefixes.claim_expiration.unpack_item(_k, _v)
|
||||
tx_hash = self.total_transactions[k.tx_num]
|
||||
tx = self.coin.transaction(self.db.get(Prefixes.tx.pack_key(tx_hash)))
|
||||
for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)):
|
||||
tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False)
|
||||
tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False))
|
||||
# treat it like a claim spend so it will delete/abandon properly
|
||||
# the _spend_claim function this result is fed to expects a txi, so make a mock one
|
||||
# print(f"\texpired lbry://{v.name} {v.claim_hash.hex()}")
|
||||
|
@ -524,21 +464,21 @@ class LevelDB:
|
|||
return expired
|
||||
|
||||
def get_controlling_claim(self, name: str) -> Optional[ClaimTakeoverValue]:
|
||||
controlling = self.db.get(Prefixes.claim_takeover.pack_key(name))
|
||||
controlling = self.prefix_db.claim_takeover.get(name)
|
||||
if not controlling:
|
||||
return
|
||||
return Prefixes.claim_takeover.unpack_value(controlling)
|
||||
return controlling
|
||||
|
||||
def get_claim_txos_for_name(self, name: str):
|
||||
txos = {}
|
||||
prefix = Prefixes.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big')
|
||||
for k, v in self.db.iterator(prefix=prefix):
|
||||
tx_num, nout = Prefixes.claim_short_id.unpack_value(v)
|
||||
prefix = self.prefix_db.claim_short_id.pack_partial_key(name) + int(1).to_bytes(1, byteorder='big')
|
||||
for k, v in self.prefix_db.iterator(prefix=prefix):
|
||||
tx_num, nout = self.prefix_db.claim_short_id.unpack_value(v)
|
||||
txos[self.get_claim_from_txo(tx_num, nout).claim_hash] = tx_num, nout
|
||||
return txos
|
||||
|
||||
def get_claim_metadata(self, tx_hash, nout):
|
||||
raw = self.db.get(Prefixes.tx.pack_key(tx_hash))
|
||||
raw = self.prefix_db.tx.get(tx_hash, deserialize_value=False)
|
||||
try:
|
||||
output = self.coin.transaction(raw).outputs[nout]
|
||||
script = OutputScript(output.pk_script)
|
||||
|
@ -547,7 +487,7 @@ class LevelDB:
|
|||
except:
|
||||
self.logger.error(
|
||||
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
|
||||
raw.hex()
|
||||
(raw or b'').hex()
|
||||
)
|
||||
return
|
||||
|
||||
|
@ -577,7 +517,7 @@ class LevelDB:
|
|||
if not reposted_claim:
|
||||
return
|
||||
reposted_metadata = self.get_claim_metadata(
|
||||
self.total_transactions[reposted_claim.tx_num], reposted_claim.position
|
||||
self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position
|
||||
)
|
||||
if not reposted_metadata:
|
||||
return
|
||||
|
@ -591,8 +531,8 @@ class LevelDB:
|
|||
reposted_fee_currency = None
|
||||
reposted_duration = None
|
||||
if reposted_claim:
|
||||
reposted_tx_hash = self.total_transactions[reposted_claim.tx_num]
|
||||
raw_reposted_claim_tx = self.db.get(Prefixes.tx.pack_key(reposted_tx_hash))
|
||||
reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False)
|
||||
raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False)
|
||||
try:
|
||||
reposted_claim_txo = self.coin.transaction(
|
||||
raw_reposted_claim_tx
|
||||
|
@ -727,7 +667,7 @@ class LevelDB:
|
|||
batch = []
|
||||
for claim_hash, claim_txo in self.claim_to_txo.items():
|
||||
# TODO: fix the couple of claim txos that dont have controlling names
|
||||
if not self.db.get(Prefixes.claim_takeover.pack_key(claim_txo.normalized_name)):
|
||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||
continue
|
||||
claim = self._fs_get_claim_by_hash(claim_hash)
|
||||
if claim:
|
||||
|
@ -793,23 +733,17 @@ class LevelDB:
|
|||
|
||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||
activated = defaultdict(list)
|
||||
for _k, _v in self.db.iterator(prefix=Prefixes.pending_activation.pack_partial_key(height)):
|
||||
k = Prefixes.pending_activation.unpack_key(_k)
|
||||
v = Prefixes.pending_activation.unpack_value(_v)
|
||||
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):
|
||||
activated[v].append(k)
|
||||
return activated
|
||||
|
||||
def get_future_activated(self, height: int) -> typing.Generator[
|
||||
Tuple[PendingActivationValue, PendingActivationKey], None, None]:
|
||||
yielded = set()
|
||||
start_prefix = Prefixes.pending_activation.pack_partial_key(height + 1)
|
||||
stop_prefix = Prefixes.pending_activation.pack_partial_key(height + 1 + self.coin.maxTakeoverDelay)
|
||||
for _k, _v in self.db.iterator(start=start_prefix, stop=stop_prefix, reverse=True):
|
||||
if _v not in yielded:
|
||||
yielded.add(_v)
|
||||
v = Prefixes.pending_activation.unpack_value(_v)
|
||||
k = Prefixes.pending_activation.unpack_key(_k)
|
||||
yield v, k
|
||||
def get_future_activated(self, height: int) -> typing.Dict[PendingActivationValue, PendingActivationKey]:
|
||||
results = {}
|
||||
for k, v in self.prefix_db.pending_activation.iterate(
|
||||
start=(height + 1,), stop=(height + 1 + self.coin.maxTakeoverDelay,), reverse=True):
|
||||
if v not in results:
|
||||
results[v] = k
|
||||
return results
|
||||
|
||||
async def _read_tx_counts(self):
|
||||
if self.tx_counts is not None:
|
||||
|
@ -818,11 +752,9 @@ class LevelDB:
|
|||
# height N. So tx_counts[0] is 1 - the genesis coinbase
|
||||
|
||||
def get_counts():
|
||||
return tuple(
|
||||
Prefixes.tx_count.unpack_value(packed_tx_count).tx_count
|
||||
for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False,
|
||||
fill_cache=False)
|
||||
)
|
||||
return [
|
||||
v.tx_count for v in self.prefix_db.tx_count.iterate(include_key=False, fill_cache=False)
|
||||
]
|
||||
|
||||
tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts)
|
||||
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
|
||||
|
@ -834,20 +766,6 @@ class LevelDB:
|
|||
else:
|
||||
assert self.db_tx_count == 0
|
||||
|
||||
async def _read_txids(self):
|
||||
def get_txids():
|
||||
return list(self.db.iterator(prefix=Prefixes.tx_hash.prefix, include_key=False, fill_cache=False))
|
||||
|
||||
start = time.perf_counter()
|
||||
self.logger.info("loading txids")
|
||||
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
||||
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
|
||||
self.total_transactions = txids
|
||||
self.transaction_num_mapping = {
|
||||
txid: i for i, txid in enumerate(txids)
|
||||
}
|
||||
ts = time.perf_counter() - start
|
||||
self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4))
|
||||
|
||||
async def _read_claim_txos(self):
|
||||
def read_claim_txos():
|
||||
|
@ -870,8 +788,9 @@ class LevelDB:
|
|||
|
||||
def get_headers():
|
||||
return [
|
||||
header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False,
|
||||
fill_cache=False)
|
||||
header for header in self.prefix_db.header.iterate(
|
||||
include_key=False, fill_cache=False, deserialize_value=False
|
||||
)
|
||||
]
|
||||
|
||||
headers = await asyncio.get_event_loop().run_in_executor(None, get_headers)
|
||||
|
@ -884,23 +803,13 @@ class LevelDB:
|
|||
return int(160.6855883050695 * height)
|
||||
|
||||
async def open_dbs(self):
|
||||
if self.db:
|
||||
if self.prefix_db and not self.prefix_db.closed:
|
||||
return
|
||||
|
||||
path = os.path.join(self.env.db_dir, 'lbry-leveldb')
|
||||
is_new = os.path.isdir(path)
|
||||
self.db = plyvel.DB(
|
||||
path, create_if_missing=True, max_open_files=512,
|
||||
lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
|
||||
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
|
||||
self.prefix_db = HubDB(
|
||||
os.path.join(self.env.db_dir, 'lbry-leveldb'), self.env.cache_MB, max_open_files=512
|
||||
)
|
||||
self.db_op_stack = RevertableOpStack(self.db.get, unsafe_prefixes={DB_PREFIXES.trending_spike.value})
|
||||
self.prefix_db = PrefixDB(self.db, self.db_op_stack)
|
||||
|
||||
if is_new:
|
||||
self.logger.info('created new db: %s', f'lbry-leveldb')
|
||||
else:
|
||||
self.logger.info(f'opened db: %s', f'lbry-leveldb')
|
||||
self.logger.info(f'opened db: lbry-leveldb')
|
||||
|
||||
# read db state
|
||||
self.read_db_state()
|
||||
|
@ -929,8 +838,6 @@ class LevelDB:
|
|||
|
||||
# Read TX counts (requires meta directory)
|
||||
await self._read_tx_counts()
|
||||
if self.total_transactions is None:
|
||||
await self._read_txids()
|
||||
await self._read_headers()
|
||||
await self._read_claim_txos()
|
||||
|
||||
|
@ -938,7 +845,7 @@ class LevelDB:
|
|||
await self.search_index.start()
|
||||
|
||||
def close(self):
|
||||
self.db.close()
|
||||
self.prefix_db.close()
|
||||
|
||||
# Header merkle cache
|
||||
|
||||
|
@ -953,113 +860,6 @@ class LevelDB:
|
|||
async def header_branch_and_root(self, length, height):
|
||||
return await self.header_mc.branch_and_root(length, height)
|
||||
|
||||
# Flushing
|
||||
def assert_flushed(self, flush_data):
|
||||
"""Asserts state is fully flushed."""
|
||||
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
|
||||
assert flush_data.height == self.fs_height == self.db_height
|
||||
assert flush_data.tip == self.db_tip
|
||||
assert not len(flush_data.put_and_delete_ops)
|
||||
|
||||
def flush_dbs(self, flush_data: FlushData):
|
||||
if flush_data.height == self.db_height:
|
||||
self.assert_flushed(flush_data)
|
||||
return
|
||||
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
delete_undo_keys = []
|
||||
if min_height > 0: # delete undos for blocks deep enough they can't be reorged
|
||||
delete_undo_keys.extend(
|
||||
self.db.iterator(
|
||||
start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False
|
||||
)
|
||||
)
|
||||
delete_undo_keys.extend(
|
||||
self.db.iterator(
|
||||
start=Prefixes.touched_or_deleted.pack_key(0),
|
||||
stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False
|
||||
)
|
||||
)
|
||||
|
||||
with self.db.write_batch(transaction=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
|
||||
for staged_change in flush_data.put_and_delete_ops:
|
||||
if staged_change.is_put:
|
||||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
for delete_key in delete_undo_keys:
|
||||
batch_delete(delete_key)
|
||||
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
self.hist_flush_count += 1
|
||||
self.hist_unflushed_count = 0
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.write_db_state(batch)
|
||||
|
||||
def flush_backup(self, flush_data):
|
||||
assert flush_data.height < self.db_height
|
||||
assert not self.hist_unflushed
|
||||
|
||||
start_time = time.time()
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
###
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
# Truncate header_mc: header count is 1 more than the height.
|
||||
self.header_mc.truncate(flush_data.height + 1)
|
||||
###
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.hist_flush_count += 1
|
||||
nremoves = 0
|
||||
|
||||
with self.db.write_batch(transaction=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for op in flush_data.put_and_delete_ops:
|
||||
# print("REWIND", op)
|
||||
if op.is_put:
|
||||
batch_put(op.key, op.value)
|
||||
else:
|
||||
batch_delete(op.key)
|
||||
while self.fs_height > flush_data.height:
|
||||
self.fs_height -= 1
|
||||
|
||||
start_time = time.time()
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||
f'{tx_count:,d} txs in '
|
||||
f'{elapsed:.1f}s, committing...')
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_db_state(batch)
|
||||
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. '
|
||||
f'Height {flush_data.height:,d} txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
def raw_header(self, height):
|
||||
"""Return the binary header at the given height."""
|
||||
header, n = self.read_headers(height, 1)
|
||||
|
@ -1103,7 +903,7 @@ class LevelDB:
|
|||
if tx_height > self.db_height:
|
||||
return None, tx_height
|
||||
try:
|
||||
return self.total_transactions[tx_num], tx_height
|
||||
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height
|
||||
except IndexError:
|
||||
self.logger.exception(
|
||||
"Failed to access a cached transaction, known bug #3142 "
|
||||
|
@ -1111,9 +911,17 @@ class LevelDB:
|
|||
)
|
||||
return None, tx_height
|
||||
|
||||
def get_block_txs(self, height: int) -> List[bytes]:
|
||||
return [
|
||||
tx_hash for tx_hash in self.prefix_db.tx_hash.iterate(
|
||||
start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],),
|
||||
deserialize_value=False, include_key=False
|
||||
)
|
||||
]
|
||||
|
||||
def _fs_transactions(self, txids: Iterable[str]):
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.db.get
|
||||
tx_db_get = self.prefix_db.tx.get
|
||||
tx_cache = self._tx_and_merkle_cache
|
||||
tx_infos = {}
|
||||
|
||||
|
@ -1123,13 +931,14 @@ class LevelDB:
|
|||
tx, merkle = cached_tx
|
||||
else:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = self.transaction_num_mapping.get(tx_hash_bytes)
|
||||
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
tx_num = None if not tx_num else tx_num.tx_num
|
||||
if tx_num is not None:
|
||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes), fill_cache=fill_cache)
|
||||
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||
if tx_height == -1:
|
||||
merkle = {
|
||||
'block_height': -1
|
||||
|
@ -1137,7 +946,7 @@ class LevelDB:
|
|||
else:
|
||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
||||
branch, root = self.merkle.branch_and_root(
|
||||
self.total_transactions[tx_counts[tx_height - 1]:tx_counts[tx_height]], tx_pos
|
||||
self.get_block_txs(tx_height), tx_pos
|
||||
)
|
||||
merkle = {
|
||||
'block_height': tx_height,
|
||||
|
@ -1160,15 +969,17 @@ class LevelDB:
|
|||
raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
||||
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
|
||||
|
||||
def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]:
|
||||
txs = array.array('I')
|
||||
for hist in self.db.iterator(prefix=Prefixes.hashX_history.pack_partial_key(hashX), include_key=False):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
txs.extend(a)
|
||||
def read_history(self, hashX: bytes, limit: int = 1000) -> List[Tuple[bytes, int]]:
|
||||
txs = []
|
||||
txs_extend = txs.extend
|
||||
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
|
||||
txs_extend([
|
||||
(self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num))
|
||||
for tx_num in hist
|
||||
])
|
||||
if len(txs) >= limit:
|
||||
break
|
||||
return txs.tolist()
|
||||
return txs
|
||||
|
||||
async def limited_history(self, hashX, *, limit=1000):
|
||||
"""Return an unpruned, sorted list of (tx_hash, height) tuples of
|
||||
|
@ -1177,13 +988,7 @@ class LevelDB:
|
|||
transactions. By default returns at most 1000 entries. Set
|
||||
limit to None to get them all.
|
||||
"""
|
||||
while True:
|
||||
history = await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit)
|
||||
if history is not None:
|
||||
return [(self.total_transactions[tx_num], bisect_right(self.tx_counts, tx_num)) for tx_num in history]
|
||||
self.logger.warning(f'limited_history: tx hash '
|
||||
f'not found (reorg?), retrying...')
|
||||
await sleep(0.25)
|
||||
return await asyncio.get_event_loop().run_in_executor(None, self.read_history, hashX, limit)
|
||||
|
||||
# -- Undo information
|
||||
|
||||
|
@ -1191,45 +996,33 @@ class LevelDB:
|
|||
"""Returns a height from which we should store undo info."""
|
||||
return max_height - self.env.reorg_limit + 1
|
||||
|
||||
def undo_key(self, height: int) -> bytes:
|
||||
"""DB key for undo information at the given height."""
|
||||
return Prefixes.undo.pack_key(height)
|
||||
|
||||
def read_undo_info(self, height: int):
|
||||
return self.db.get(Prefixes.undo.pack_key(height)), self.db.get(Prefixes.touched_or_deleted.pack_key(height))
|
||||
return self.prefix_db.undo.get(height), self.prefix_db.touched_or_deleted.get(height)
|
||||
|
||||
def apply_expiration_extension_fork(self):
|
||||
# TODO: this can't be reorged
|
||||
deletes = []
|
||||
adds = []
|
||||
|
||||
for k, v in self.db.iterator(prefix=Prefixes.claim_expiration.prefix):
|
||||
old_key = Prefixes.claim_expiration.unpack_key(k)
|
||||
new_key = Prefixes.claim_expiration.pack_key(
|
||||
bisect_right(self.tx_counts, old_key.tx_num) + self.coin.nExtendedClaimExpirationTime,
|
||||
old_key.tx_num, old_key.position
|
||||
for k, v in self.prefix_db.claim_expiration.iterate():
|
||||
self.prefix_db.claim_expiration.stage_delete(k, v)
|
||||
self.prefix_db.claim_expiration.stage_put(
|
||||
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
|
||||
k.tx_num, k.position), v
|
||||
)
|
||||
deletes.append(k)
|
||||
adds.append((new_key, v))
|
||||
with self.db.write_batch(transaction=True) as batch:
|
||||
for k in deletes:
|
||||
batch.delete(k)
|
||||
for k, v in adds:
|
||||
batch.put(k, v)
|
||||
self.prefix_db.unsafe_commit()
|
||||
|
||||
def write_db_state(self, batch):
|
||||
def write_db_state(self):
|
||||
"""Write (UTXO) state to the batch."""
|
||||
batch.put(
|
||||
DB_PREFIXES.db_state.value,
|
||||
DBState(
|
||||
if self.db_height > 0:
|
||||
self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get())
|
||||
self.prefix_db.db_state.stage_put((), (
|
||||
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
||||
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
|
||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
|
||||
def read_db_state(self):
|
||||
state = self.db.get(DB_PREFIXES.db_state.value)
|
||||
state = self.prefix_db.db_state.get()
|
||||
|
||||
if not state:
|
||||
self.db_height = -1
|
||||
self.db_tx_count = 0
|
||||
|
@ -1243,7 +1036,6 @@ class LevelDB:
|
|||
self.hist_comp_cursor = -1
|
||||
self.hist_db_version = max(self.DB_VERSIONS)
|
||||
else:
|
||||
state = DBState.unpack(state)
|
||||
self.db_version = state.db_version
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
raise DBError(f'your DB version is {self.db_version} but this '
|
||||
|
@ -1264,15 +1056,21 @@ class LevelDB:
|
|||
self.hist_comp_cursor = state.comp_cursor
|
||||
self.hist_db_version = state.db_version
|
||||
|
||||
def assert_db_state(self):
|
||||
state = self.prefix_db.db_state.get()
|
||||
assert self.db_version == state.db_version
|
||||
assert self.db_height == state.height
|
||||
assert self.db_tx_count == state.tx_count
|
||||
assert self.db_tip == state.tip
|
||||
assert self.first_sync == state.first_sync
|
||||
|
||||
async def all_utxos(self, hashX):
|
||||
"""Return all UTXOs for an address sorted in no particular order."""
|
||||
def read_utxos():
|
||||
utxos = []
|
||||
utxos_append = utxos.append
|
||||
fs_tx_hash = self.fs_tx_hash
|
||||
for db_key, db_value in self.db.iterator(prefix=Prefixes.utxo.pack_partial_key(hashX)):
|
||||
k = Prefixes.utxo.unpack_key(db_key)
|
||||
v = Prefixes.utxo.unpack_value(db_value)
|
||||
for k, v in self.prefix_db.utxo.iterate(prefix=(hashX, )):
|
||||
tx_hash, height = fs_tx_hash(k.tx_num)
|
||||
utxos_append(UTXO(k.tx_num, k.nout, tx_hash, height, v.amount))
|
||||
return utxos
|
||||
|
@ -1290,14 +1088,16 @@ class LevelDB:
|
|||
utxos = []
|
||||
utxo_append = utxos.append
|
||||
for (tx_hash, nout) in prevouts:
|
||||
if tx_hash not in self.transaction_num_mapping:
|
||||
tx_num_val = self.prefix_db.tx_num.get(tx_hash)
|
||||
if not tx_num_val:
|
||||
continue
|
||||
tx_num = self.transaction_num_mapping[tx_hash]
|
||||
hashX = self.db.get(Prefixes.hashX_utxo.pack_key(tx_hash[:4], tx_num, nout))
|
||||
if not hashX:
|
||||
tx_num = tx_num_val.tx_num
|
||||
hashX_val = self.prefix_db.hashX_utxo.get(tx_hash[:4], tx_num, nout)
|
||||
if not hashX_val:
|
||||
continue
|
||||
utxo_value = self.db.get(Prefixes.utxo.pack_key(hashX, tx_num, nout))
|
||||
hashX = hashX_val.hashX
|
||||
utxo_value = self.prefix_db.utxo.get(hashX, tx_num, nout)
|
||||
if utxo_value:
|
||||
utxo_append((hashX, Prefixes.utxo.unpack_value(utxo_value).amount))
|
||||
utxo_append((hashX, utxo_value.amount))
|
||||
return utxos
|
||||
return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos)
|
||||
|
|
|
@ -115,7 +115,7 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
def check_supports(claim_id, lbrycrd_supports):
|
||||
for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))):
|
||||
support = lbrycrd_supports[i]
|
||||
self.assertEqual(support['txId'], db.total_transactions[tx_num][::-1].hex())
|
||||
self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex())
|
||||
self.assertEqual(support['n'], position)
|
||||
self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num))
|
||||
self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True))
|
||||
|
@ -127,7 +127,7 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
check_supports(c['claimId'], c['supports'])
|
||||
claim_hash = bytes.fromhex(c['claimId'])
|
||||
self.assertEqual(c['validAtHeight'], db.get_activation(
|
||||
db.transaction_num_mapping[bytes.fromhex(c['txId'])[::-1]], c['n']
|
||||
db.prefix_db.tx_num.get(bytes.fromhex(c['txId'])[::-1]).tx_num, c['n']
|
||||
))
|
||||
self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash))
|
||||
|
||||
|
@ -1451,19 +1451,13 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
|||
|
||||
async def assertBlockHash(self, height):
|
||||
bp = self.conductor.spv_node.server.bp
|
||||
|
||||
def get_txids():
|
||||
return [
|
||||
bp.db.fs_tx_hash(tx_num)[0][::-1].hex()
|
||||
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
|
||||
]
|
||||
|
||||
block_hash = await self.blockchain.get_block_hash(height)
|
||||
|
||||
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
|
||||
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||
|
||||
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
||||
txids = [
|
||||
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
|
||||
]
|
||||
txs = await bp.db.fs_transactions(txids)
|
||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import unittest
|
||||
import tempfile
|
||||
import shutil
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertableDelete, RevertablePut, OpStackIntegrity
|
||||
from lbry.wallet.server.db.prefixes import Prefixes
|
||||
from lbry.wallet.server.db.prefixes import ClaimToTXOPrefixRow, HubDB
|
||||
|
||||
|
||||
class TestRevertableOpStack(unittest.TestCase):
|
||||
|
@ -25,14 +27,14 @@ class TestRevertableOpStack(unittest.TestCase):
|
|||
self.stack.append_op(RevertablePut(key2, value2))
|
||||
|
||||
def test_simplify(self):
|
||||
key1 = Prefixes.claim_to_txo.pack_key(b'\x01' * 20)
|
||||
key2 = Prefixes.claim_to_txo.pack_key(b'\x02' * 20)
|
||||
key3 = Prefixes.claim_to_txo.pack_key(b'\x03' * 20)
|
||||
key4 = Prefixes.claim_to_txo.pack_key(b'\x04' * 20)
|
||||
key1 = ClaimToTXOPrefixRow.pack_key(b'\x01' * 20)
|
||||
key2 = ClaimToTXOPrefixRow.pack_key(b'\x02' * 20)
|
||||
key3 = ClaimToTXOPrefixRow.pack_key(b'\x03' * 20)
|
||||
key4 = ClaimToTXOPrefixRow.pack_key(b'\x04' * 20)
|
||||
|
||||
val1 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'derp')
|
||||
val2 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'oops')
|
||||
val3 = Prefixes.claim_to_txo.pack_value(1, 0, 1, 0, 1, 0, 'other')
|
||||
val1 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'derp')
|
||||
val2 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'oops')
|
||||
val3 = ClaimToTXOPrefixRow.pack_value(1, 0, 1, 0, 1, 0, 'other')
|
||||
|
||||
# check that we can't delete a non existent value
|
||||
with self.assertRaises(OpStackIntegrity):
|
||||
|
@ -101,3 +103,48 @@ class TestRevertableOpStack(unittest.TestCase):
|
|||
self.process_stack()
|
||||
self.assertDictEqual({key2: val3}, self.fake_db)
|
||||
|
||||
|
||||
class TestRevertablePrefixDB(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp_dir = tempfile.mkdtemp()
|
||||
self.db = HubDB(self.tmp_dir, cache_mb=1, max_open_files=32)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.db.close()
|
||||
shutil.rmtree(self.tmp_dir)
|
||||
|
||||
def test_rollback(self):
|
||||
name = 'derp'
|
||||
claim_hash1 = 20 * b'\x00'
|
||||
claim_hash2 = 20 * b'\x01'
|
||||
claim_hash3 = 20 * b'\x02'
|
||||
|
||||
takeover_height = 10000000
|
||||
|
||||
self.assertIsNone(self.db.claim_takeover.get(name))
|
||||
self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height))
|
||||
self.db.commit(10000000)
|
||||
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
|
||||
|
||||
self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height))
|
||||
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1))
|
||||
self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1))
|
||||
self.db.commit(10000001)
|
||||
self.assertIsNone(self.db.claim_takeover.get(name))
|
||||
self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2))
|
||||
self.db.commit(10000002)
|
||||
self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
|
||||
|
||||
self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2))
|
||||
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3))
|
||||
self.db.commit(10000003)
|
||||
self.assertEqual(10000003, self.db.claim_takeover.get(name).height)
|
||||
|
||||
self.db.rollback(10000003)
|
||||
self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
|
||||
self.db.rollback(10000002)
|
||||
self.assertIsNone(self.db.claim_takeover.get(name))
|
||||
self.db.rollback(10000001)
|
||||
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
|
||||
self.db.rollback(10000000)
|
||||
self.assertIsNone(self.db.claim_takeover.get(name))
|
||||
|
|
Loading…
Add table
Reference in a new issue