diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 2dc78cc84..a51c70ca2 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -28,7 +28,7 @@ from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effe from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes from lbry.wallet.server.udp import StatusServer -from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete +from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack if typing.TYPE_CHECKING: from lbry.wallet.server.leveldb import LevelDB @@ -610,7 +610,7 @@ class BlockProcessor: if not spent_claim_hash_and_name: # txo is not a claim return [] claim_hash = spent_claim_hash_and_name.claim_hash - signing_hash = self.db.get_channel_for_claim(claim_hash) + signing_hash = self.db.get_channel_for_claim(claim_hash, txin_num, txin.prev_idx) v = self.db.get_claim_txo(claim_hash) reposted_claim_hash = self.db.get_repost(claim_hash) spent = StagedClaimtrieItem( @@ -634,6 +634,7 @@ class BlockProcessor: return self._spend_support_txo(txin) def _abandon_claim(self, claim_hash, tx_num, nout, name) -> List['RevertableOp']: + claim_from_db = False if (tx_num, nout) in self.pending_claims: pending = self.pending_claims.pop((tx_num, nout)) self.staged_pending_abandoned[pending.claim_hash] = pending @@ -646,9 +647,10 @@ class BlockProcessor: v = self.db.get_claim_txo( claim_hash ) + claim_from_db = True claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount signature_is_valid = v.channel_signature_is_valid - prev_signing_hash = self.db.get_channel_for_claim(claim_hash) + prev_signing_hash = self.db.get_channel_for_claim(claim_hash, tx_num, nout) reposted_claim_hash = self.db.get_repost(claim_hash) expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num)) self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem( @@ -690,17 +692,12 @@ class BlockProcessor: ) ) ]) - if staged.signing_hash: - ops.append(RevertableDelete(*Prefixes.claim_to_channel.pack_item(staged.claim_hash, staged.signing_hash))) - return ops - - def _abandon(self, spent_claims) -> List['RevertableOp']: - # Handle abandoned claims - ops = [] - - for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items(): - # print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}") - ops.extend(self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)) + if staged.signing_hash and claim_from_db: + ops.append(RevertableDelete( + *Prefixes.claim_to_channel.pack_item( + staged.claim_hash, staged.tx_num, staged.position, staged.signing_hash + ) + )) return ops def _expire_claims(self, height: int): @@ -712,7 +709,11 @@ class BlockProcessor: ops.extend(self._spend_claim_txo(txi, spent_claims)) if expired: # do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned - ops.extend(self._abandon(spent_claims)) + for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items(): + # print(f"\texpire {abandoned_claim_hash.hex()} {tx_num} {nout}") + abandon_ops = self._abandon_claim(abandoned_claim_hash, tx_num, nout, name) + if abandon_ops: + ops.extend(abandon_ops) return ops def _cached_get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int: @@ -881,7 +882,6 @@ class BlockProcessor: # add the activation/delayed-activation ops for activated, activated_txos in activated_at_height.items(): controlling = get_controlling(activated.name) - if activated.claim_hash in self.staged_pending_abandoned: continue reactivate = False @@ -1088,12 +1088,13 @@ class BlockProcessor: tx_num, position = previous_pending_activate.tx_num, previous_pending_activate.position if previous_pending_activate.height > height: # the claim had a pending activation in the future, move it to now - ops.extend( - StagedActivation( - ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, - position, previous_pending_activate.height, name, amount - ).get_remove_activate_ops() - ) + if tx_num < self.tx_count: + ops.extend( + StagedActivation( + ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, + position, previous_pending_activate.height, name, amount + ).get_remove_activate_ops() + ) ops.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, @@ -1138,8 +1139,11 @@ class BlockProcessor: removed_claim = self.db.get_claim_txo(removed) if not removed_claim: continue + amt = self._cached_get_effective_amount(removed) + if amt <= 0: + continue ops.extend(get_remove_effective_amount_ops( - removed_claim.name, self._cached_get_effective_amount(removed), removed_claim.tx_num, + removed_claim.name, amt, removed_claim.tx_num, removed_claim.position, removed )) for touched in self.touched_claims_to_send_es: @@ -1148,16 +1152,20 @@ class BlockProcessor: name, tx_num, position = pending.name, pending.tx_num, pending.position claim_from_db = self.db.get_claim_txo(touched) if claim_from_db: - prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position - ops.extend(get_remove_effective_amount_ops( - name, self._cached_get_effective_amount(touched), prev_tx_num, prev_position, touched - )) + amount = self._cached_get_effective_amount(touched) + if amount > 0: + prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position + ops.extend(get_remove_effective_amount_ops( + name, amount, prev_tx_num, prev_position, touched + )) else: v = self.db.get_claim_txo(touched) name, tx_num, position = v.name, v.tx_num, v.position - ops.extend(get_remove_effective_amount_ops( - name, self._cached_get_effective_amount(touched), tx_num, position, touched - )) + amt = self._cached_get_effective_amount(touched) + if amt > 0: + ops.extend(get_remove_effective_amount_ops( + name, amt, tx_num, position, touched + )) ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched), tx_num, position, touched)) return ops @@ -1178,7 +1186,7 @@ class BlockProcessor: # Use local vars for speed in the loops put_utxo = self.utxo_cache.__setitem__ - claimtrie_stash = [] + claimtrie_stash = RevertableOpStack(self.db.db.get) claimtrie_stash_extend = claimtrie_stash.extend spend_utxo = self.spend_utxo undo_info_append = undo_info.append @@ -1224,9 +1232,11 @@ class BlockProcessor: claimtrie_stash_extend(claim_or_support_ops) # Handle abandoned claims - abandon_ops = self._abandon(spent_claims) - if abandon_ops: - claimtrie_stash_extend(abandon_ops) + for abandoned_claim_hash, (tx_num, nout, name) in spent_claims.items(): + # print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}") + abandon_ops = self._abandon_claim(abandoned_claim_hash, tx_num, nout, name) + if abandon_ops: + claimtrie_stash_extend(abandon_ops) append_hashX_by_tx(hashXs) update_touched(hashXs) diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 9d20105ee..65a2b02a3 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -1,9 +1,8 @@ import typing from typing import Optional -from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix -from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow -from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, RepostPrefixRow, RepostedPrefixRow +from lbry.wallet.server.db.prefixes import RepostPrefixRow, RepostedPrefixRow def length_encoded_name(name: str) -> bytes: @@ -184,7 +183,9 @@ class StagedClaimtrieItem(typing.NamedTuple): ops.append( # channel by stream op( - *Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash) + *Prefixes.claim_to_channel.pack_item( + self.claim_hash, self.tx_num, self.position, self.signing_hash + ) ) ) if self.channel_signature_is_valid: diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 32a2f5bbf..c591b5761 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -83,6 +83,8 @@ class ClaimShortIDValue(typing.NamedTuple): class ClaimToChannelKey(typing.NamedTuple): claim_hash: bytes + tx_num: int + position: int class ClaimToChannelValue(typing.NamedTuple): @@ -373,12 +375,19 @@ class ClaimShortIDPrefixRow(PrefixRow): class ClaimToChannelPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_to_channel.value - key_struct = struct.Struct(b'>20s') + key_struct = struct.Struct(b'>20sLH') value_struct = struct.Struct(b'>20s') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack, + struct.Struct(b'>20sL').pack, + struct.Struct(b'>20sLH').pack + ] + @classmethod - def pack_key(cls, claim_hash: bytes): - return super().pack_key(claim_hash) + def pack_key(cls, claim_hash: bytes, tx_num: int, position: int): + return super().pack_key(claim_hash, tx_num, position) @classmethod def pack_value(cls, signing_hash: bytes): @@ -393,8 +402,8 @@ class ClaimToChannelPrefixRow(PrefixRow): return ClaimToChannelValue(*super().unpack_value(data)) @classmethod - def pack_item(cls, claim_hash: bytes, signing_hash: bytes): - return cls.pack_key(claim_hash), cls.pack_value(signing_hash) + def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, signing_hash: bytes): + return cls.pack_key(claim_hash, tx_num, position), cls.pack_value(signing_hash) def channel_to_claim_helper(struct_fmt): @@ -755,6 +764,33 @@ class RepostedPrefixRow(PrefixRow): return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash) +class UndoPrefixRow(PrefixRow): + prefix = DB_PREFIXES.undo_claimtrie.value + key_struct = struct.Struct(b'>Q') + + @classmethod + def pack_key(cls, height: int): + return super().pack_key(height) + + @classmethod + def unpack_key(cls, key: bytes) -> int: + assert key[:1] == cls.prefix + height, = cls.key_struct.unpack(key[1:]) + return height + + @classmethod + def pack_value(cls, undo_ops: bytes) -> bytes: + return undo_ops + + @classmethod + def unpack_value(cls, data: bytes) -> bytes: + return data + + @classmethod + def pack_item(cls, height: int, undo_ops: bytes): + return cls.pack_key(height), cls.pack_value(undo_ops) + + class Prefixes: claim_to_support = ClaimToSupportPrefixRow support_to_claim = SupportToClaimPrefixRow @@ -778,4 +814,28 @@ class Prefixes: repost = RepostPrefixRow reposted_claim = RepostedPrefixRow - # undo_claimtrie = b'M' + undo = UndoPrefixRow + + +ROW_TYPES = { + Prefixes.claim_to_support.prefix: Prefixes.claim_to_support, + Prefixes.support_to_claim.prefix: Prefixes.support_to_claim, + Prefixes.claim_to_txo.prefix: Prefixes.claim_to_txo, + Prefixes.txo_to_claim.prefix: Prefixes.txo_to_claim, + Prefixes.claim_to_channel.prefix: Prefixes.claim_to_channel, + Prefixes.channel_to_claim.prefix: Prefixes.channel_to_claim, + Prefixes.claim_short_id.prefix: Prefixes.claim_short_id, + Prefixes.claim_expiration.prefix: Prefixes.claim_expiration, + Prefixes.claim_takeover.prefix: Prefixes.claim_takeover, + Prefixes.pending_activation.prefix: Prefixes.pending_activation, + Prefixes.activated.prefix: Prefixes.activated, + Prefixes.active_amount.prefix: Prefixes.active_amount, + Prefixes.effective_amount.prefix: Prefixes.effective_amount, + Prefixes.repost.prefix: Prefixes.repost, + Prefixes.reposted_claim.prefix: Prefixes.reposted_claim, + Prefixes.undo.prefix: Prefixes.undo +} + + +def auto_decode_item(key: bytes, value: bytes) -> typing.Tuple[typing.NamedTuple, typing.NamedTuple]: + return ROW_TYPES[key[:1]].unpack_item(key, value) diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index bd391cf88..026f404b6 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -1,5 +1,6 @@ import struct -from typing import Tuple, List +from collections import OrderedDict, defaultdict +from typing import Tuple, List, Iterable, Callable, Optional from lbry.wallet.server.db import DB_PREFIXES _OP_STRUCT = struct.Struct('>BHH') @@ -58,8 +59,9 @@ class RevertableOp: return (self.is_put, self.key, self.value) == (other.is_put, other.key, other.value) def __repr__(self) -> str: - return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: " \ - f"{self.key[1:].hex()} | {self.value.hex()}" + from lbry.wallet.server.db.prefixes import auto_decode_item + k, v = auto_decode_item(self.key, self.value) + return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: {k} | {v}" class RevertableDelete(RevertableOp): @@ -74,5 +76,48 @@ class RevertablePut(RevertableOp): return RevertableDelete(self.key, self.value) -def delete_prefix(db: 'plyvel.DB', prefix: bytes) -> List['RevertableDelete']: - return [RevertableDelete(k, v) for k, v in db.iterator(prefix=prefix)] +class RevertableOpStack: + def __init__(self, get_fn: Callable[[bytes], Optional[bytes]]): + self._get = get_fn + self._items = defaultdict(list) + + def append(self, op: RevertableOp): + inverted = op.invert() + if self._items[op.key] and inverted == self._items[op.key][-1]: + self._items[op.key].pop() + else: + if op.is_put: + if op in self._items[op.key]: + # TODO: error + print("!! dup put", op) + # self._items[op.key].remove(op) + # assert op not in self._items[op.key], f"duplicate add for {op}" + stored = self._get(op.key) + if stored is not None: + assert RevertableDelete(op.key, stored) in self._items[op.key], f"db op ties to add on top of existing key={op}" + self._items[op.key].append(op) + else: + if op in self._items[op.key]: + # TODO: error + print("!! dup delete ", op) + # self._items[op.key].remove(op) + # assert op not in self._items[op.key], f"duplicate delete for {op}" + stored = self._get(op.key) + if stored is not None and stored != op.value: + assert RevertableDelete(op.key, stored) in self._items[op.key] + else: + assert stored is not None, f"db op tries to delete nonexistent key: {op}" + assert stored == op.value, f"db op tries to delete with incorrect value: {op}" + self._items[op.key].append(op) + + def extend(self, ops: Iterable[RevertableOp]): + for op in ops: + self.append(op) + + def __len__(self): + return sum(map(len, self._items.values())) + + def __iter__(self): + for key, ops in self._items.items(): + for op in ops: + yield op diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index cd5a89be3..c2654e69f 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -34,7 +34,7 @@ from lbry.wallet.server.tx import TxInput from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.storage import db_class -from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix +from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, RevertableOpStack from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue @@ -229,7 +229,7 @@ class LevelDB: claim_amount = self.get_claim_txo_amount(claim_hash) effective_amount = support_amount + claim_amount - channel_hash = self.get_channel_for_claim(claim_hash) + channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) reposted_claim_hash = self.get_repost(claim_hash) short_url = f'{name}#{claim_hash.hex()}' @@ -410,8 +410,8 @@ class LevelDB: count += 1 return count - def get_channel_for_claim(self, claim_hash) -> Optional[bytes]: - return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash)) + def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]: + return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position)) def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]: expired = {} @@ -770,7 +770,7 @@ class LevelDB: keys.append(key) if min_height > 0: for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value, - stop=DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(min_height), + stop=Prefixes.undo.pack_key(min_height), include_value=False): keys.append(key) if keys: @@ -885,7 +885,6 @@ class LevelDB: flush_data.headers.clear() flush_data.block_txs.clear() flush_data.block_hashes.clear() - op_count = len(flush_data.claimtrie_stash) for staged_change in flush_data.claimtrie_stash: # print("ADVANCE", staged_change) if staged_change.is_put: @@ -895,7 +894,7 @@ class LevelDB: flush_data.claimtrie_stash.clear() for undo_ops, height in flush_data.undo: - batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_ops) + batch_put(*Prefixes.undo.pack_item(height, undo_ops)) flush_data.undo.clear() self.fs_height = flush_data.height @@ -964,22 +963,25 @@ class LevelDB: self.hist_flush_count += 1 nremoves = 0 + undo_ops = RevertableOpStack(self.db.get) + + for (packed_ops, height) in reversed(flush_data.undo): + undo_ops.extend(reversed(RevertableOp.unpack_stack(packed_ops))) + undo_ops.append( + RevertableDelete(*Prefixes.undo.pack_item(height, packed_ops)) + ) + with self.db.write_batch() as batch: batch_put = batch.put batch_delete = batch.delete - claim_reorg_height = self.fs_height # print("flush undos", flush_data.undo_claimtrie) - for (packed_ops, height) in reversed(flush_data.undo): - undo_ops = RevertableOp.unpack_stack(packed_ops) - for op in reversed(undo_ops): - # print("REWIND", op) - if op.is_put: - batch_put(op.key, op.value) - else: - batch_delete(op.key) - batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height)) - claim_reorg_height -= 1 + for op in undo_ops: + # print("REWIND", op) + if op.is_put: + batch_put(op.key, op.value) + else: + batch_delete(op.key) flush_data.undo.clear() flush_data.claimtrie_stash.clear() @@ -1209,8 +1211,7 @@ class LevelDB: def read_undo_info(self, height): """Read undo information from a file for the current height.""" - undo_claims = self.db.get(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(self.fs_height)) - return self.db.get(self.undo_key(height)), undo_claims + return self.db.get(self.undo_key(height)), self.db.get(Prefixes.undo.pack_key(self.fs_height)) def raw_block_prefix(self): return 'block'