add RevertableOpStack to verify consistency of ops as they're staged

This commit is contained in:
Jack Robison 2021-06-22 17:25:23 -04:00
parent 8b37a66075
commit 4d3573724a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 186 additions and 69 deletions

View file

@ -28,7 +28,7 @@ from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effe
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes
from lbry.wallet.server.udp import StatusServer
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB
@ -610,7 +610,7 @@ class BlockProcessor:
if not spent_claim_hash_and_name: # txo is not a claim
return []
claim_hash = spent_claim_hash_and_name.claim_hash
signing_hash = self.db.get_channel_for_claim(claim_hash)
signing_hash = self.db.get_channel_for_claim(claim_hash, txin_num, txin.prev_idx)
v = self.db.get_claim_txo(claim_hash)
reposted_claim_hash = self.db.get_repost(claim_hash)
spent = StagedClaimtrieItem(
@ -634,6 +634,7 @@ class BlockProcessor:
return self._spend_support_txo(txin)
def _abandon_claim(self, claim_hash, tx_num, nout, name) -> List['RevertableOp']:
claim_from_db = False
if (tx_num, nout) in self.pending_claims:
pending = self.pending_claims.pop((tx_num, nout))
self.staged_pending_abandoned[pending.claim_hash] = pending
@ -646,9 +647,10 @@ class BlockProcessor:
v = self.db.get_claim_txo(
claim_hash
)
claim_from_db = True
claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount
signature_is_valid = v.channel_signature_is_valid
prev_signing_hash = self.db.get_channel_for_claim(claim_hash)
prev_signing_hash = self.db.get_channel_for_claim(claim_hash, tx_num, nout)
reposted_claim_hash = self.db.get_repost(claim_hash)
expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num))
self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem(
@ -690,17 +692,12 @@ class BlockProcessor:
)
)
])
if staged.signing_hash:
ops.append(RevertableDelete(*Prefixes.claim_to_channel.pack_item(staged.claim_hash, staged.signing_hash)))
return ops
def _abandon(self, spent_claims) -> List['RevertableOp']:
# Handle abandoned claims
ops = []
for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items():
# print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}")
ops.extend(self._abandon_claim(abandoned_claim_hash, tx_num, nout, name))
if staged.signing_hash and claim_from_db:
ops.append(RevertableDelete(
*Prefixes.claim_to_channel.pack_item(
staged.claim_hash, staged.tx_num, staged.position, staged.signing_hash
)
))
return ops
def _expire_claims(self, height: int):
@ -712,7 +709,11 @@ class BlockProcessor:
ops.extend(self._spend_claim_txo(txi, spent_claims))
if expired:
# do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned
ops.extend(self._abandon(spent_claims))
for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items():
# print(f"\texpire {abandoned_claim_hash.hex()} {tx_num} {nout}")
abandon_ops = self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)
if abandon_ops:
ops.extend(abandon_ops)
return ops
def _cached_get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int:
@ -881,7 +882,6 @@ class BlockProcessor:
# add the activation/delayed-activation ops
for activated, activated_txos in activated_at_height.items():
controlling = get_controlling(activated.name)
if activated.claim_hash in self.staged_pending_abandoned:
continue
reactivate = False
@ -1088,12 +1088,13 @@ class BlockProcessor:
tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position
if previous_pending_activate.height > height:
# the claim had a pending activation in the future, move it to now
ops.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, previous_pending_activate.height, name, amount
).get_remove_activate_ops()
)
if tx_num < self.tx_count:
ops.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, previous_pending_activate.height, name, amount
).get_remove_activate_ops()
)
ops.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
@ -1138,8 +1139,11 @@ class BlockProcessor:
removed_claim = self.db.get_claim_txo(removed)
if not removed_claim:
continue
amt = self._cached_get_effective_amount(removed)
if amt <= 0:
continue
ops.extend(get_remove_effective_amount_ops(
removed_claim.name, self._cached_get_effective_amount(removed), removed_claim.tx_num,
removed_claim.name, amt, removed_claim.tx_num,
removed_claim.position, removed
))
for touched in self.touched_claims_to_send_es:
@ -1148,16 +1152,20 @@ class BlockProcessor:
name, tx_num, position = pending.name, pending.tx_num, pending.position
claim_from_db = self.db.get_claim_txo(touched)
if claim_from_db:
prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position
ops.extend(get_remove_effective_amount_ops(
name, self._cached_get_effective_amount(touched), prev_tx_num, prev_position, touched
))
amount = self._cached_get_effective_amount(touched)
if amount > 0:
prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position
ops.extend(get_remove_effective_amount_ops(
name, amount, prev_tx_num, prev_position, touched
))
else:
v = self.db.get_claim_txo(touched)
name, tx_num, position = v.name, v.tx_num, v.position
ops.extend(get_remove_effective_amount_ops(
name, self._cached_get_effective_amount(touched), tx_num, position, touched
))
amt = self._cached_get_effective_amount(touched)
if amt > 0:
ops.extend(get_remove_effective_amount_ops(
name, amt, tx_num, position, touched
))
ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
tx_num, position, touched))
return ops
@ -1178,7 +1186,7 @@ class BlockProcessor:
# Use local vars for speed in the loops
put_utxo = self.utxo_cache.__setitem__
claimtrie_stash = []
claimtrie_stash = RevertableOpStack(self.db.db.get)
claimtrie_stash_extend = claimtrie_stash.extend
spend_utxo = self.spend_utxo
undo_info_append = undo_info.append
@ -1224,9 +1232,11 @@ class BlockProcessor:
claimtrie_stash_extend(claim_or_support_ops)
# Handle abandoned claims
abandon_ops = self._abandon(spent_claims)
if abandon_ops:
claimtrie_stash_extend(abandon_ops)
for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items():
# print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}")
abandon_ops = self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)
if abandon_ops:
claimtrie_stash_extend(abandon_ops)
append_hashX_by_tx(hashXs)
update_touched(hashXs)

View file

@ -1,9 +1,8 @@
import typing
from typing import Optional
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, RepostPrefixRow, RepostedPrefixRow
from lbry.wallet.server.db.prefixes import RepostPrefixRow, RepostedPrefixRow
def length_encoded_name(name: str) -> bytes:
@ -184,7 +183,9 @@ class StagedClaimtrieItem(typing.NamedTuple):
ops.append(
# channel by stream
op(
*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash)
*Prefixes.claim_to_channel.pack_item(
self.claim_hash, self.tx_num, self.position, self.signing_hash
)
)
)
if self.channel_signature_is_valid:

View file

@ -83,6 +83,8 @@ class ClaimShortIDValue(typing.NamedTuple):
class ClaimToChannelKey(typing.NamedTuple):
claim_hash: bytes
tx_num: int
position: int
class ClaimToChannelValue(typing.NamedTuple):
@ -373,12 +375,19 @@ class ClaimShortIDPrefixRow(PrefixRow):
class ClaimToChannelPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_to_channel.value
key_struct = struct.Struct(b'>20s')
key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>20s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack,
struct.Struct(b'>20sL').pack,
struct.Struct(b'>20sLH').pack
]
@classmethod
def pack_key(cls, claim_hash: bytes):
return super().pack_key(claim_hash)
def pack_key(cls, claim_hash: bytes, tx_num: int, position: int):
return super().pack_key(claim_hash, tx_num, position)
@classmethod
def pack_value(cls, signing_hash: bytes):
@ -393,8 +402,8 @@ class ClaimToChannelPrefixRow(PrefixRow):
return ClaimToChannelValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, claim_hash: bytes, signing_hash: bytes):
return cls.pack_key(claim_hash), cls.pack_value(signing_hash)
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, signing_hash: bytes):
return cls.pack_key(claim_hash, tx_num, position), cls.pack_value(signing_hash)
def channel_to_claim_helper(struct_fmt):
@ -755,6 +764,33 @@ class RepostedPrefixRow(PrefixRow):
return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash)
class UndoPrefixRow(PrefixRow):
prefix = DB_PREFIXES.undo_claimtrie.value
key_struct = struct.Struct(b'>Q')
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@classmethod
def unpack_key(cls, key: bytes) -> int:
assert key[:1] == cls.prefix
height, = cls.key_struct.unpack(key[1:])
return height
@classmethod
def pack_value(cls, undo_ops: bytes) -> bytes:
return undo_ops
@classmethod
def unpack_value(cls, data: bytes) -> bytes:
return data
@classmethod
def pack_item(cls, height: int, undo_ops: bytes):
return cls.pack_key(height), cls.pack_value(undo_ops)
class Prefixes:
claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow
@ -778,4 +814,28 @@ class Prefixes:
repost = RepostPrefixRow
reposted_claim = RepostedPrefixRow
# undo_claimtrie = b'M'
undo = UndoPrefixRow
ROW_TYPES = {
Prefixes.claim_to_support.prefix: Prefixes.claim_to_support,
Prefixes.support_to_claim.prefix: Prefixes.support_to_claim,
Prefixes.claim_to_txo.prefix: Prefixes.claim_to_txo,
Prefixes.txo_to_claim.prefix: Prefixes.txo_to_claim,
Prefixes.claim_to_channel.prefix: Prefixes.claim_to_channel,
Prefixes.channel_to_claim.prefix: Prefixes.channel_to_claim,
Prefixes.claim_short_id.prefix: Prefixes.claim_short_id,
Prefixes.claim_expiration.prefix: Prefixes.claim_expiration,
Prefixes.claim_takeover.prefix: Prefixes.claim_takeover,
Prefixes.pending_activation.prefix: Prefixes.pending_activation,
Prefixes.activated.prefix: Prefixes.activated,
Prefixes.active_amount.prefix: Prefixes.active_amount,
Prefixes.effective_amount.prefix: Prefixes.effective_amount,
Prefixes.repost.prefix: Prefixes.repost,
Prefixes.reposted_claim.prefix: Prefixes.reposted_claim,
Prefixes.undo.prefix: Prefixes.undo
}
def auto_decode_item(key: bytes, value: bytes) -> typing.Tuple[typing.NamedTuple, typing.NamedTuple]:
return ROW_TYPES[key[:1]].unpack_item(key, value)

View file

@ -1,5 +1,6 @@
import struct
from typing import Tuple, List
from collections import OrderedDict, defaultdict
from typing import Tuple, List, Iterable, Callable, Optional
from lbry.wallet.server.db import DB_PREFIXES
_OP_STRUCT = struct.Struct('>BHH')
@ -58,8 +59,9 @@ class RevertableOp:
return (self.is_put, self.key, self.value) == (other.is_put, other.key, other.value)
def __repr__(self) -> str:
return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: " \
f"{self.key[1:].hex()} | {self.value.hex()}"
from lbry.wallet.server.db.prefixes import auto_decode_item
k, v = auto_decode_item(self.key, self.value)
return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: {k} | {v}"
class RevertableDelete(RevertableOp):
@ -74,5 +76,48 @@ class RevertablePut(RevertableOp):
return RevertableDelete(self.key, self.value)
def delete_prefix(db: 'plyvel.DB', prefix: bytes) -> List['RevertableDelete']:
return [RevertableDelete(k, v) for k, v in db.iterator(prefix=prefix)]
class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]):
self._get = get_fn
self._items = defaultdict(list)
def append(self, op: RevertableOp):
inverted = op.invert()
if self._items[op.key] and inverted == self._items[op.key][-1]:
self._items[op.key].pop()
else:
if op.is_put:
if op in self._items[op.key]:
# TODO: error
print("!! dup put", op)
# self._items[op.key].remove(op)
# assert op not in self._items[op.key], f"duplicate add for {op}"
stored = self._get(op.key)
if stored is not None:
assert RevertableDelete(op.key, stored) in self._items[op.key], f"db op ties to add on top of existing key={op}"
self._items[op.key].append(op)
else:
if op in self._items[op.key]:
# TODO: error
print("!! dup delete ", op)
# self._items[op.key].remove(op)
# assert op not in self._items[op.key], f"duplicate delete for {op}"
stored = self._get(op.key)
if stored is not None and stored != op.value:
assert RevertableDelete(op.key, stored) in self._items[op.key]
else:
assert stored is not None, f"db op tries to delete nonexistent key: {op}"
assert stored == op.value, f"db op tries to delete with incorrect value: {op}"
self._items[op.key].append(op)
def extend(self, ops: Iterable[RevertableOp]):
for op in ops:
self.append(op)
def __len__(self):
return sum(map(len, self._items.values()))
def __iter__(self):
for key, ops in self._items.items():
for op in ops:
yield op

View file

@ -34,7 +34,7 @@ from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, RevertableOpStack
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
@ -229,7 +229,7 @@ class LevelDB:
claim_amount = self.get_claim_txo_amount(claim_hash)
effective_amount = support_amount + claim_amount
channel_hash = self.get_channel_for_claim(claim_hash)
channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position)
reposted_claim_hash = self.get_repost(claim_hash)
short_url = f'{name}#{claim_hash.hex()}'
@ -410,8 +410,8 @@ class LevelDB:
count += 1
return count
def get_channel_for_claim(self, claim_hash) -> Optional[bytes]:
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash))
def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]:
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position))
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
expired = {}
@ -770,7 +770,7 @@ class LevelDB:
keys.append(key)
if min_height > 0:
for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value,
stop=DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(min_height),
stop=Prefixes.undo.pack_key(min_height),
include_value=False):
keys.append(key)
if keys:
@ -885,7 +885,6 @@ class LevelDB:
flush_data.headers.clear()
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
op_count = len(flush_data.claimtrie_stash)
for staged_change in flush_data.claimtrie_stash:
# print("ADVANCE", staged_change)
if staged_change.is_put:
@ -895,7 +894,7 @@ class LevelDB:
flush_data.claimtrie_stash.clear()
for undo_ops, height in flush_data.undo:
batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_ops)
batch_put(*Prefixes.undo.pack_item(height, undo_ops))
flush_data.undo.clear()
self.fs_height = flush_data.height
@ -964,22 +963,25 @@ class LevelDB:
self.hist_flush_count += 1
nremoves = 0
undo_ops = RevertableOpStack(self.db.get)
for (packed_ops, height) in reversed(flush_data.undo):
undo_ops.extend(reversed(RevertableOp.unpack_stack(packed_ops)))
undo_ops.append(
RevertableDelete(*Prefixes.undo.pack_item(height, packed_ops))
)
with self.db.write_batch() as batch:
batch_put = batch.put
batch_delete = batch.delete
claim_reorg_height = self.fs_height
# print("flush undos", flush_data.undo_claimtrie)
for (packed_ops, height) in reversed(flush_data.undo):
undo_ops = RevertableOp.unpack_stack(packed_ops)
for op in reversed(undo_ops):
# print("REWIND", op)
if op.is_put:
batch_put(op.key, op.value)
else:
batch_delete(op.key)
batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height))
claim_reorg_height -= 1
for op in undo_ops:
# print("REWIND", op)
if op.is_put:
batch_put(op.key, op.value)
else:
batch_delete(op.key)
flush_data.undo.clear()
flush_data.claimtrie_stash.clear()
@ -1209,8 +1211,7 @@ class LevelDB:
def read_undo_info(self, height):
"""Read undo information from a file for the current height."""
undo_claims = self.db.get(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(self.fs_height))
return self.db.get(self.undo_key(height)), undo_claims
return self.db.get(self.undo_key(height)), self.db.get(Prefixes.undo.pack_key(self.fs_height))
def raw_block_prefix(self):
return 'block'