add RevertableOpStack to verify consistency of ops as they're staged
This commit is contained in:
parent
b856e2120a
commit
468ed91ee3
5 changed files with 186 additions and 69 deletions
|
@ -28,7 +28,7 @@ from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effe
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes
|
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes
|
||||||
from lbry.wallet.server.udp import StatusServer
|
from lbry.wallet.server.udp import StatusServer
|
||||||
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete
|
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.wallet.server.leveldb import LevelDB
|
from lbry.wallet.server.leveldb import LevelDB
|
||||||
|
|
||||||
|
@ -610,7 +610,7 @@ class BlockProcessor:
|
||||||
if not spent_claim_hash_and_name: # txo is not a claim
|
if not spent_claim_hash_and_name: # txo is not a claim
|
||||||
return []
|
return []
|
||||||
claim_hash = spent_claim_hash_and_name.claim_hash
|
claim_hash = spent_claim_hash_and_name.claim_hash
|
||||||
signing_hash = self.db.get_channel_for_claim(claim_hash)
|
signing_hash = self.db.get_channel_for_claim(claim_hash, txin_num, txin.prev_idx)
|
||||||
v = self.db.get_claim_txo(claim_hash)
|
v = self.db.get_claim_txo(claim_hash)
|
||||||
reposted_claim_hash = self.db.get_repost(claim_hash)
|
reposted_claim_hash = self.db.get_repost(claim_hash)
|
||||||
spent = StagedClaimtrieItem(
|
spent = StagedClaimtrieItem(
|
||||||
|
@ -634,6 +634,7 @@ class BlockProcessor:
|
||||||
return self._spend_support_txo(txin)
|
return self._spend_support_txo(txin)
|
||||||
|
|
||||||
def _abandon_claim(self, claim_hash, tx_num, nout, name) -> List['RevertableOp']:
|
def _abandon_claim(self, claim_hash, tx_num, nout, name) -> List['RevertableOp']:
|
||||||
|
claim_from_db = False
|
||||||
if (tx_num, nout) in self.pending_claims:
|
if (tx_num, nout) in self.pending_claims:
|
||||||
pending = self.pending_claims.pop((tx_num, nout))
|
pending = self.pending_claims.pop((tx_num, nout))
|
||||||
self.staged_pending_abandoned[pending.claim_hash] = pending
|
self.staged_pending_abandoned[pending.claim_hash] = pending
|
||||||
|
@ -646,9 +647,10 @@ class BlockProcessor:
|
||||||
v = self.db.get_claim_txo(
|
v = self.db.get_claim_txo(
|
||||||
claim_hash
|
claim_hash
|
||||||
)
|
)
|
||||||
|
claim_from_db = True
|
||||||
claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount
|
claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount
|
||||||
signature_is_valid = v.channel_signature_is_valid
|
signature_is_valid = v.channel_signature_is_valid
|
||||||
prev_signing_hash = self.db.get_channel_for_claim(claim_hash)
|
prev_signing_hash = self.db.get_channel_for_claim(claim_hash, tx_num, nout)
|
||||||
reposted_claim_hash = self.db.get_repost(claim_hash)
|
reposted_claim_hash = self.db.get_repost(claim_hash)
|
||||||
expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num))
|
expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num))
|
||||||
self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem(
|
self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem(
|
||||||
|
@ -690,17 +692,12 @@ class BlockProcessor:
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
])
|
])
|
||||||
if staged.signing_hash:
|
if staged.signing_hash and claim_from_db:
|
||||||
ops.append(RevertableDelete(*Prefixes.claim_to_channel.pack_item(staged.claim_hash, staged.signing_hash)))
|
ops.append(RevertableDelete(
|
||||||
return ops
|
*Prefixes.claim_to_channel.pack_item(
|
||||||
|
staged.claim_hash, staged.tx_num, staged.position, staged.signing_hash
|
||||||
def _abandon(self, spent_claims) -> List['RevertableOp']:
|
)
|
||||||
# Handle abandoned claims
|
))
|
||||||
ops = []
|
|
||||||
|
|
||||||
for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items():
|
|
||||||
# print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}")
|
|
||||||
ops.extend(self._abandon_claim(abandoned_claim_hash, tx_num, nout, name))
|
|
||||||
return ops
|
return ops
|
||||||
|
|
||||||
def _expire_claims(self, height: int):
|
def _expire_claims(self, height: int):
|
||||||
|
@ -712,7 +709,11 @@ class BlockProcessor:
|
||||||
ops.extend(self._spend_claim_txo(txi, spent_claims))
|
ops.extend(self._spend_claim_txo(txi, spent_claims))
|
||||||
if expired:
|
if expired:
|
||||||
# do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned
|
# do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned
|
||||||
ops.extend(self._abandon(spent_claims))
|
for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items():
|
||||||
|
# print(f"\texpire {abandoned_claim_hash.hex()} {tx_num} {nout}")
|
||||||
|
abandon_ops = self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)
|
||||||
|
if abandon_ops:
|
||||||
|
ops.extend(abandon_ops)
|
||||||
return ops
|
return ops
|
||||||
|
|
||||||
def _cached_get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int:
|
def _cached_get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int:
|
||||||
|
@ -881,7 +882,6 @@ class BlockProcessor:
|
||||||
# add the activation/delayed-activation ops
|
# add the activation/delayed-activation ops
|
||||||
for activated, activated_txos in activated_at_height.items():
|
for activated, activated_txos in activated_at_height.items():
|
||||||
controlling = get_controlling(activated.name)
|
controlling = get_controlling(activated.name)
|
||||||
|
|
||||||
if activated.claim_hash in self.staged_pending_abandoned:
|
if activated.claim_hash in self.staged_pending_abandoned:
|
||||||
continue
|
continue
|
||||||
reactivate = False
|
reactivate = False
|
||||||
|
@ -1088,6 +1088,7 @@ class BlockProcessor:
|
||||||
tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position
|
tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position
|
||||||
if previous_pending_activate.height > height:
|
if previous_pending_activate.height > height:
|
||||||
# the claim had a pending activation in the future, move it to now
|
# the claim had a pending activation in the future, move it to now
|
||||||
|
if tx_num < self.tx_count:
|
||||||
ops.extend(
|
ops.extend(
|
||||||
StagedActivation(
|
StagedActivation(
|
||||||
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
|
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
|
||||||
|
@ -1138,8 +1139,11 @@ class BlockProcessor:
|
||||||
removed_claim = self.db.get_claim_txo(removed)
|
removed_claim = self.db.get_claim_txo(removed)
|
||||||
if not removed_claim:
|
if not removed_claim:
|
||||||
continue
|
continue
|
||||||
|
amt = self._cached_get_effective_amount(removed)
|
||||||
|
if amt <= 0:
|
||||||
|
continue
|
||||||
ops.extend(get_remove_effective_amount_ops(
|
ops.extend(get_remove_effective_amount_ops(
|
||||||
removed_claim.name, self._cached_get_effective_amount(removed), removed_claim.tx_num,
|
removed_claim.name, amt, removed_claim.tx_num,
|
||||||
removed_claim.position, removed
|
removed_claim.position, removed
|
||||||
))
|
))
|
||||||
for touched in self.touched_claims_to_send_es:
|
for touched in self.touched_claims_to_send_es:
|
||||||
|
@ -1148,15 +1152,19 @@ class BlockProcessor:
|
||||||
name, tx_num, position = pending.name, pending.tx_num, pending.position
|
name, tx_num, position = pending.name, pending.tx_num, pending.position
|
||||||
claim_from_db = self.db.get_claim_txo(touched)
|
claim_from_db = self.db.get_claim_txo(touched)
|
||||||
if claim_from_db:
|
if claim_from_db:
|
||||||
|
amount = self._cached_get_effective_amount(touched)
|
||||||
|
if amount > 0:
|
||||||
prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position
|
prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position
|
||||||
ops.extend(get_remove_effective_amount_ops(
|
ops.extend(get_remove_effective_amount_ops(
|
||||||
name, self._cached_get_effective_amount(touched), prev_tx_num, prev_position, touched
|
name, amount, prev_tx_num, prev_position, touched
|
||||||
))
|
))
|
||||||
else:
|
else:
|
||||||
v = self.db.get_claim_txo(touched)
|
v = self.db.get_claim_txo(touched)
|
||||||
name, tx_num, position = v.name, v.tx_num, v.position
|
name, tx_num, position = v.name, v.tx_num, v.position
|
||||||
|
amt = self._cached_get_effective_amount(touched)
|
||||||
|
if amt > 0:
|
||||||
ops.extend(get_remove_effective_amount_ops(
|
ops.extend(get_remove_effective_amount_ops(
|
||||||
name, self._cached_get_effective_amount(touched), tx_num, position, touched
|
name, amt, tx_num, position, touched
|
||||||
))
|
))
|
||||||
ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
|
ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
|
||||||
tx_num, position, touched))
|
tx_num, position, touched))
|
||||||
|
@ -1178,7 +1186,7 @@ class BlockProcessor:
|
||||||
|
|
||||||
# Use local vars for speed in the loops
|
# Use local vars for speed in the loops
|
||||||
put_utxo = self.utxo_cache.__setitem__
|
put_utxo = self.utxo_cache.__setitem__
|
||||||
claimtrie_stash = []
|
claimtrie_stash = RevertableOpStack(self.db.db.get)
|
||||||
claimtrie_stash_extend = claimtrie_stash.extend
|
claimtrie_stash_extend = claimtrie_stash.extend
|
||||||
spend_utxo = self.spend_utxo
|
spend_utxo = self.spend_utxo
|
||||||
undo_info_append = undo_info.append
|
undo_info_append = undo_info.append
|
||||||
|
@ -1224,7 +1232,9 @@ class BlockProcessor:
|
||||||
claimtrie_stash_extend(claim_or_support_ops)
|
claimtrie_stash_extend(claim_or_support_ops)
|
||||||
|
|
||||||
# Handle abandoned claims
|
# Handle abandoned claims
|
||||||
abandon_ops = self._abandon(spent_claims)
|
for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items():
|
||||||
|
# print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}")
|
||||||
|
abandon_ops = self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)
|
||||||
if abandon_ops:
|
if abandon_ops:
|
||||||
claimtrie_stash_extend(abandon_ops)
|
claimtrie_stash_extend(abandon_ops)
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
import typing
|
import typing
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
|
||||||
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
|
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, RepostPrefixRow, RepostedPrefixRow
|
from lbry.wallet.server.db.prefixes import RepostPrefixRow, RepostedPrefixRow
|
||||||
|
|
||||||
|
|
||||||
def length_encoded_name(name: str) -> bytes:
|
def length_encoded_name(name: str) -> bytes:
|
||||||
|
@ -184,7 +183,9 @@ class StagedClaimtrieItem(typing.NamedTuple):
|
||||||
ops.append(
|
ops.append(
|
||||||
# channel by stream
|
# channel by stream
|
||||||
op(
|
op(
|
||||||
*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash)
|
*Prefixes.claim_to_channel.pack_item(
|
||||||
|
self.claim_hash, self.tx_num, self.position, self.signing_hash
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if self.channel_signature_is_valid:
|
if self.channel_signature_is_valid:
|
||||||
|
|
|
@ -83,6 +83,8 @@ class ClaimShortIDValue(typing.NamedTuple):
|
||||||
|
|
||||||
class ClaimToChannelKey(typing.NamedTuple):
|
class ClaimToChannelKey(typing.NamedTuple):
|
||||||
claim_hash: bytes
|
claim_hash: bytes
|
||||||
|
tx_num: int
|
||||||
|
position: int
|
||||||
|
|
||||||
|
|
||||||
class ClaimToChannelValue(typing.NamedTuple):
|
class ClaimToChannelValue(typing.NamedTuple):
|
||||||
|
@ -373,12 +375,19 @@ class ClaimShortIDPrefixRow(PrefixRow):
|
||||||
|
|
||||||
class ClaimToChannelPrefixRow(PrefixRow):
|
class ClaimToChannelPrefixRow(PrefixRow):
|
||||||
prefix = DB_PREFIXES.claim_to_channel.value
|
prefix = DB_PREFIXES.claim_to_channel.value
|
||||||
key_struct = struct.Struct(b'>20s')
|
key_struct = struct.Struct(b'>20sLH')
|
||||||
value_struct = struct.Struct(b'>20s')
|
value_struct = struct.Struct(b'>20s')
|
||||||
|
|
||||||
|
key_part_lambdas = [
|
||||||
|
lambda: b'',
|
||||||
|
struct.Struct(b'>20s').pack,
|
||||||
|
struct.Struct(b'>20sL').pack,
|
||||||
|
struct.Struct(b'>20sLH').pack
|
||||||
|
]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def pack_key(cls, claim_hash: bytes):
|
def pack_key(cls, claim_hash: bytes, tx_num: int, position: int):
|
||||||
return super().pack_key(claim_hash)
|
return super().pack_key(claim_hash, tx_num, position)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def pack_value(cls, signing_hash: bytes):
|
def pack_value(cls, signing_hash: bytes):
|
||||||
|
@ -393,8 +402,8 @@ class ClaimToChannelPrefixRow(PrefixRow):
|
||||||
return ClaimToChannelValue(*super().unpack_value(data))
|
return ClaimToChannelValue(*super().unpack_value(data))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def pack_item(cls, claim_hash: bytes, signing_hash: bytes):
|
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, signing_hash: bytes):
|
||||||
return cls.pack_key(claim_hash), cls.pack_value(signing_hash)
|
return cls.pack_key(claim_hash, tx_num, position), cls.pack_value(signing_hash)
|
||||||
|
|
||||||
|
|
||||||
def channel_to_claim_helper(struct_fmt):
|
def channel_to_claim_helper(struct_fmt):
|
||||||
|
@ -755,6 +764,33 @@ class RepostedPrefixRow(PrefixRow):
|
||||||
return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash)
|
return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash)
|
||||||
|
|
||||||
|
|
||||||
|
class UndoPrefixRow(PrefixRow):
|
||||||
|
prefix = DB_PREFIXES.undo_claimtrie.value
|
||||||
|
key_struct = struct.Struct(b'>Q')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_key(cls, height: int):
|
||||||
|
return super().pack_key(height)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_key(cls, key: bytes) -> int:
|
||||||
|
assert key[:1] == cls.prefix
|
||||||
|
height, = cls.key_struct.unpack(key[1:])
|
||||||
|
return height
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_value(cls, undo_ops: bytes) -> bytes:
|
||||||
|
return undo_ops
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_value(cls, data: bytes) -> bytes:
|
||||||
|
return data
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_item(cls, height: int, undo_ops: bytes):
|
||||||
|
return cls.pack_key(height), cls.pack_value(undo_ops)
|
||||||
|
|
||||||
|
|
||||||
class Prefixes:
|
class Prefixes:
|
||||||
claim_to_support = ClaimToSupportPrefixRow
|
claim_to_support = ClaimToSupportPrefixRow
|
||||||
support_to_claim = SupportToClaimPrefixRow
|
support_to_claim = SupportToClaimPrefixRow
|
||||||
|
@ -778,4 +814,28 @@ class Prefixes:
|
||||||
repost = RepostPrefixRow
|
repost = RepostPrefixRow
|
||||||
reposted_claim = RepostedPrefixRow
|
reposted_claim = RepostedPrefixRow
|
||||||
|
|
||||||
# undo_claimtrie = b'M'
|
undo = UndoPrefixRow
|
||||||
|
|
||||||
|
|
||||||
|
ROW_TYPES = {
|
||||||
|
Prefixes.claim_to_support.prefix: Prefixes.claim_to_support,
|
||||||
|
Prefixes.support_to_claim.prefix: Prefixes.support_to_claim,
|
||||||
|
Prefixes.claim_to_txo.prefix: Prefixes.claim_to_txo,
|
||||||
|
Prefixes.txo_to_claim.prefix: Prefixes.txo_to_claim,
|
||||||
|
Prefixes.claim_to_channel.prefix: Prefixes.claim_to_channel,
|
||||||
|
Prefixes.channel_to_claim.prefix: Prefixes.channel_to_claim,
|
||||||
|
Prefixes.claim_short_id.prefix: Prefixes.claim_short_id,
|
||||||
|
Prefixes.claim_expiration.prefix: Prefixes.claim_expiration,
|
||||||
|
Prefixes.claim_takeover.prefix: Prefixes.claim_takeover,
|
||||||
|
Prefixes.pending_activation.prefix: Prefixes.pending_activation,
|
||||||
|
Prefixes.activated.prefix: Prefixes.activated,
|
||||||
|
Prefixes.active_amount.prefix: Prefixes.active_amount,
|
||||||
|
Prefixes.effective_amount.prefix: Prefixes.effective_amount,
|
||||||
|
Prefixes.repost.prefix: Prefixes.repost,
|
||||||
|
Prefixes.reposted_claim.prefix: Prefixes.reposted_claim,
|
||||||
|
Prefixes.undo.prefix: Prefixes.undo
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def auto_decode_item(key: bytes, value: bytes) -> typing.Tuple[typing.NamedTuple, typing.NamedTuple]:
|
||||||
|
return ROW_TYPES[key[:1]].unpack_item(key, value)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import struct
|
import struct
|
||||||
from typing import Tuple, List
|
from collections import OrderedDict, defaultdict
|
||||||
|
from typing import Tuple, List, Iterable, Callable, Optional
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
|
|
||||||
_OP_STRUCT = struct.Struct('>BHH')
|
_OP_STRUCT = struct.Struct('>BHH')
|
||||||
|
@ -58,8 +59,9 @@ class RevertableOp:
|
||||||
return (self.is_put, self.key, self.value) == (other.is_put, other.key, other.value)
|
return (self.is_put, self.key, self.value) == (other.is_put, other.key, other.value)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: " \
|
from lbry.wallet.server.db.prefixes import auto_decode_item
|
||||||
f"{self.key[1:].hex()} | {self.value.hex()}"
|
k, v = auto_decode_item(self.key, self.value)
|
||||||
|
return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: {k} | {v}"
|
||||||
|
|
||||||
|
|
||||||
class RevertableDelete(RevertableOp):
|
class RevertableDelete(RevertableOp):
|
||||||
|
@ -74,5 +76,48 @@ class RevertablePut(RevertableOp):
|
||||||
return RevertableDelete(self.key, self.value)
|
return RevertableDelete(self.key, self.value)
|
||||||
|
|
||||||
|
|
||||||
def delete_prefix(db: 'plyvel.DB', prefix: bytes) -> List['RevertableDelete']:
|
class RevertableOpStack:
|
||||||
return [RevertableDelete(k, v) for k, v in db.iterator(prefix=prefix)]
|
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]):
|
||||||
|
self._get = get_fn
|
||||||
|
self._items = defaultdict(list)
|
||||||
|
|
||||||
|
def append(self, op: RevertableOp):
|
||||||
|
inverted = op.invert()
|
||||||
|
if self._items[op.key] and inverted == self._items[op.key][-1]:
|
||||||
|
self._items[op.key].pop()
|
||||||
|
else:
|
||||||
|
if op.is_put:
|
||||||
|
if op in self._items[op.key]:
|
||||||
|
# TODO: error
|
||||||
|
print("!! dup put", op)
|
||||||
|
# self._items[op.key].remove(op)
|
||||||
|
# assert op not in self._items[op.key], f"duplicate add for {op}"
|
||||||
|
stored = self._get(op.key)
|
||||||
|
if stored is not None:
|
||||||
|
assert RevertableDelete(op.key, stored) in self._items[op.key], f"db op ties to add on top of existing key={op}"
|
||||||
|
self._items[op.key].append(op)
|
||||||
|
else:
|
||||||
|
if op in self._items[op.key]:
|
||||||
|
# TODO: error
|
||||||
|
print("!! dup delete ", op)
|
||||||
|
# self._items[op.key].remove(op)
|
||||||
|
# assert op not in self._items[op.key], f"duplicate delete for {op}"
|
||||||
|
stored = self._get(op.key)
|
||||||
|
if stored is not None and stored != op.value:
|
||||||
|
assert RevertableDelete(op.key, stored) in self._items[op.key]
|
||||||
|
else:
|
||||||
|
assert stored is not None, f"db op tries to delete nonexistent key: {op}"
|
||||||
|
assert stored == op.value, f"db op tries to delete with incorrect value: {op}"
|
||||||
|
self._items[op.key].append(op)
|
||||||
|
|
||||||
|
def extend(self, ops: Iterable[RevertableOp]):
|
||||||
|
for op in ops:
|
||||||
|
self.append(op)
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return sum(map(len, self._items.values()))
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
for key, ops in self._items.items():
|
||||||
|
for op in ops:
|
||||||
|
yield op
|
||||||
|
|
|
@ -34,7 +34,7 @@ from lbry.wallet.server.tx import TxInput
|
||||||
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
||||||
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
|
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
|
||||||
from lbry.wallet.server.storage import db_class
|
from lbry.wallet.server.storage import db_class
|
||||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, RevertableOpStack
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
|
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
|
||||||
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
||||||
|
@ -229,7 +229,7 @@ class LevelDB:
|
||||||
claim_amount = self.get_claim_txo_amount(claim_hash)
|
claim_amount = self.get_claim_txo_amount(claim_hash)
|
||||||
|
|
||||||
effective_amount = support_amount + claim_amount
|
effective_amount = support_amount + claim_amount
|
||||||
channel_hash = self.get_channel_for_claim(claim_hash)
|
channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position)
|
||||||
reposted_claim_hash = self.get_repost(claim_hash)
|
reposted_claim_hash = self.get_repost(claim_hash)
|
||||||
|
|
||||||
short_url = f'{name}#{claim_hash.hex()}'
|
short_url = f'{name}#{claim_hash.hex()}'
|
||||||
|
@ -410,8 +410,8 @@ class LevelDB:
|
||||||
count += 1
|
count += 1
|
||||||
return count
|
return count
|
||||||
|
|
||||||
def get_channel_for_claim(self, claim_hash) -> Optional[bytes]:
|
def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]:
|
||||||
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash))
|
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position))
|
||||||
|
|
||||||
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
|
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
|
||||||
expired = {}
|
expired = {}
|
||||||
|
@ -770,7 +770,7 @@ class LevelDB:
|
||||||
keys.append(key)
|
keys.append(key)
|
||||||
if min_height > 0:
|
if min_height > 0:
|
||||||
for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value,
|
for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value,
|
||||||
stop=DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(min_height),
|
stop=Prefixes.undo.pack_key(min_height),
|
||||||
include_value=False):
|
include_value=False):
|
||||||
keys.append(key)
|
keys.append(key)
|
||||||
if keys:
|
if keys:
|
||||||
|
@ -885,7 +885,6 @@ class LevelDB:
|
||||||
flush_data.headers.clear()
|
flush_data.headers.clear()
|
||||||
flush_data.block_txs.clear()
|
flush_data.block_txs.clear()
|
||||||
flush_data.block_hashes.clear()
|
flush_data.block_hashes.clear()
|
||||||
op_count = len(flush_data.claimtrie_stash)
|
|
||||||
for staged_change in flush_data.claimtrie_stash:
|
for staged_change in flush_data.claimtrie_stash:
|
||||||
# print("ADVANCE", staged_change)
|
# print("ADVANCE", staged_change)
|
||||||
if staged_change.is_put:
|
if staged_change.is_put:
|
||||||
|
@ -895,7 +894,7 @@ class LevelDB:
|
||||||
flush_data.claimtrie_stash.clear()
|
flush_data.claimtrie_stash.clear()
|
||||||
|
|
||||||
for undo_ops, height in flush_data.undo:
|
for undo_ops, height in flush_data.undo:
|
||||||
batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_ops)
|
batch_put(*Prefixes.undo.pack_item(height, undo_ops))
|
||||||
flush_data.undo.clear()
|
flush_data.undo.clear()
|
||||||
|
|
||||||
self.fs_height = flush_data.height
|
self.fs_height = flush_data.height
|
||||||
|
@ -964,22 +963,25 @@ class LevelDB:
|
||||||
self.hist_flush_count += 1
|
self.hist_flush_count += 1
|
||||||
nremoves = 0
|
nremoves = 0
|
||||||
|
|
||||||
|
undo_ops = RevertableOpStack(self.db.get)
|
||||||
|
|
||||||
|
for (packed_ops, height) in reversed(flush_data.undo):
|
||||||
|
undo_ops.extend(reversed(RevertableOp.unpack_stack(packed_ops)))
|
||||||
|
undo_ops.append(
|
||||||
|
RevertableDelete(*Prefixes.undo.pack_item(height, packed_ops))
|
||||||
|
)
|
||||||
|
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
batch_delete = batch.delete
|
batch_delete = batch.delete
|
||||||
|
|
||||||
claim_reorg_height = self.fs_height
|
|
||||||
# print("flush undos", flush_data.undo_claimtrie)
|
# print("flush undos", flush_data.undo_claimtrie)
|
||||||
for (packed_ops, height) in reversed(flush_data.undo):
|
for op in undo_ops:
|
||||||
undo_ops = RevertableOp.unpack_stack(packed_ops)
|
|
||||||
for op in reversed(undo_ops):
|
|
||||||
# print("REWIND", op)
|
# print("REWIND", op)
|
||||||
if op.is_put:
|
if op.is_put:
|
||||||
batch_put(op.key, op.value)
|
batch_put(op.key, op.value)
|
||||||
else:
|
else:
|
||||||
batch_delete(op.key)
|
batch_delete(op.key)
|
||||||
batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height))
|
|
||||||
claim_reorg_height -= 1
|
|
||||||
|
|
||||||
flush_data.undo.clear()
|
flush_data.undo.clear()
|
||||||
flush_data.claimtrie_stash.clear()
|
flush_data.claimtrie_stash.clear()
|
||||||
|
@ -1209,8 +1211,7 @@ class LevelDB:
|
||||||
|
|
||||||
def read_undo_info(self, height):
|
def read_undo_info(self, height):
|
||||||
"""Read undo information from a file for the current height."""
|
"""Read undo information from a file for the current height."""
|
||||||
undo_claims = self.db.get(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(self.fs_height))
|
return self.db.get(self.undo_key(height)), self.db.get(Prefixes.undo.pack_key(self.fs_height))
|
||||||
return self.db.get(self.undo_key(height)), undo_claims
|
|
||||||
|
|
||||||
def raw_block_prefix(self):
|
def raw_block_prefix(self):
|
||||||
return 'block'
|
return 'block'
|
||||||
|
|
Loading…
Reference in a new issue