diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 90eb1eebc..117f5fb0a 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1,6 +1,7 @@ import time import asyncio import typing +from bisect import bisect_right from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, List, Tuple @@ -16,11 +17,11 @@ from lbry.wallet.server.util import chunks, class_logger from lbry.crypto.hash import hash160 from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport - +from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport, get_expiration_height from lbry.wallet.server.udp import StatusServer if typing.TYPE_CHECKING: from lbry.wallet.server.leveldb import LevelDB + from lbry.wallet.server.db.revertable import RevertableOp class Prefetcher: @@ -207,6 +208,8 @@ class BlockProcessor: self.pending_support_txos = {} self.pending_abandon = set() + + async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -460,7 +463,8 @@ class BlockProcessor: self.history_cache.clear() self.notifications.notified_mempool_txs.clear() - def _add_claim_or_update(self, txo, script, tx_hash, idx, tx_count, txout, spent_claims): + def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout, + spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]) -> List['RevertableOp']: try: claim_name = txo.normalized_name except UnicodeDecodeError: @@ -506,10 +510,9 @@ class BlockProcessor: root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( claim_hash ) - pending = StagedClaimtrieItem( claim_name, claim_hash, txout.value, support_amount + txout.value, - activation_height, tx_count, idx, root_tx_num, root_idx, + activation_height, get_expiration_height(height), tx_count, idx, root_tx_num, root_idx, signing_channel_hash, channel_claims_count ) @@ -518,7 +521,7 @@ class BlockProcessor: self.effective_amount_changes[claim_hash].append(txout.value) return pending.get_add_claim_utxo_ops() - def _add_support(self, txo, txout, idx, tx_count): + def _add_support(self, txo, txout, idx, tx_count) -> List['RevertableOp']: supported_claim_hash = txo.claim_hash[::-1] if supported_claim_hash in self.effective_amount_changes: @@ -529,7 +532,6 @@ class BlockProcessor: return StagedClaimtrieSupport( supported_claim_hash, tx_count, idx, txout.value ).get_add_support_utxo_ops() - elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon: if self.db.claim_exists(supported_claim_hash): _, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount( @@ -549,9 +551,10 @@ class BlockProcessor: print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}") return [] - def _add_claim_or_support(self, tx_hash, tx_count, idx, txo, txout, script, spent_claims): + def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_count: int, idx: int, txo, txout, script, + spent_claims: typing.Dict[bytes, Tuple[int, int, str]]) -> List['RevertableOp']: if script.is_claim_name or script.is_update_claim: - return self._add_claim_or_update(txo, script, tx_hash, idx, tx_count, txout, spent_claims) + return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims) elif script.is_support_claim or script.is_support_claim_data: return self._add_support(txo, txout, idx, tx_count) return [] @@ -602,9 +605,10 @@ class BlockProcessor: ) claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash) activation_height = 0 + height = bisect_right(self.db.tx_counts, tx_num) spent = StagedClaimtrieItem( name, prev_claim_hash, prev_amount, prev_effective_amount, - activation_height, txin_num, txin.prev_idx, claim_root_tx_num, + activation_height, get_expiration_height(height), txin_num, txin.prev_idx, claim_root_tx_num, claim_root_idx, prev_signing_hash, prev_claims_in_channel_count ) spent_claims[prev_claim_hash] = (txin_num, txin.prev_idx, name) @@ -668,7 +672,9 @@ class BlockProcessor: ) # print(f"\tremove support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}") + height = bisect_right(self.db.tx_counts, prev_tx_num) activation_height = 0 + if abandoned_claim_hash in self.effective_amount_changes: # print("pop") self.effective_amount_changes.pop(abandoned_claim_hash) @@ -677,10 +683,22 @@ class BlockProcessor: # print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}") ops.extend( StagedClaimtrieItem( - name, abandoned_claim_hash, prev_amount, prev_effective_amount, - activation_height, prev_tx_num, prev_idx, claim_root_tx_num, - claim_root_idx, prev_signing_hash, prev_claims_in_channel_count - ).get_abandon_ops(self.db.db)) + name, abandoned_claim_hash, prev_amount, prev_effective_amount, + activation_height, get_expiration_height(height), prev_tx_num, prev_idx, claim_root_tx_num, + claim_root_idx, prev_signing_hash, prev_claims_in_channel_count + ).get_abandon_ops(self.db.db) + ) + return ops + + def _expire_claims(self, height: int): + expired = self.db.get_expired_by_height(height) + spent_claims = {} + ops = [] + for expired_claim_hash, (tx_num, position, name, txi) in expired.items(): + if (tx_num, position) not in self.pending_claims: + ops.extend(self._spend_claim(txi, spent_claims)) + if expired: + ops.extend(self._abandon(spent_claims)) return ops def advance_block(self, block, height: int): @@ -708,7 +726,7 @@ class BlockProcessor: append_hashX_by_tx = hashXs_by_tx.append hashX_from_script = self.coin.hashX_from_script - unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()} + # unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()} for tx, tx_hash in txs: # print(f"{tx_hash[::-1].hex()} @ {height}") @@ -745,7 +763,7 @@ class BlockProcessor: txo = Output(txout.value, script) claim_or_support_ops = self._add_claim_or_support( - tx_hash, tx_count, idx, txo, txout, script, spent_claims + height, tx_hash, tx_count, idx, txo, txout, script, spent_claims ) if claim_or_support_ops: claimtrie_stash_extend(claim_or_support_ops) @@ -761,6 +779,12 @@ class BlockProcessor: self.db.transaction_num_mapping[tx_hash] = tx_count tx_count += 1 + # handle expired claims + expired_ops = self._expire_claims(height) + if expired_ops: + print(f"************\nexpire claims at block {height}\n************") + claimtrie_stash_extend(expired_ops) + # self.db.add_unflushed(hashXs_by_tx, self.tx_count) _unflushed = self.db.hist_unflushed _count = 0 diff --git a/lbry/wallet/server/coin.py b/lbry/wallet/server/coin.py index fd43a7053..3ef4b83f9 100644 --- a/lbry/wallet/server/coin.py +++ b/lbry/wallet/server/coin.py @@ -14,7 +14,6 @@ from lbry.wallet.server.daemon import Daemon, LBCDaemon from lbry.wallet.server.script import ScriptPubKey, OpCodes from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager -# from lbry.wallet.server.db.writer import LBRYLevelDB from lbry.wallet.server.block_processor import LBRYBlockProcessor @@ -214,6 +213,11 @@ class Coin: txs = cls.DESERIALIZER(raw_block, start=len(header)).read_tx_block() return Block(raw_block, header, txs) + @classmethod + def transaction(cls, raw_tx: bytes): + """Return a Block namedtuple given a raw block and its height.""" + return cls.DESERIALIZER(raw_tx).read_tx() + @classmethod def decimal_value(cls, value): """Return the number of standard coin units as a Decimal given a diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index 31237d207..47293217d 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -13,6 +13,7 @@ class DB_PREFIXES(enum.Enum): claim_short_id_prefix = b'F' claim_effective_amount_prefix = b'D' + claim_expiration = b'O' undo_claimtrie = b'M' diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index e43b413c5..055e68912 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -4,6 +4,21 @@ from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, Re from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.prefixes import Prefixes +nOriginalClaimExpirationTime = 262974 +nExtendedClaimExpirationTime = 2102400 +nExtendedClaimExpirationForkHeight = 400155 +nNormalizedNameForkHeight = 539940 # targeting 21 March 2019 +nMinTakeoverWorkaroundHeight = 496850 +nMaxTakeoverWorkaroundHeight = 658300 # targeting 30 Oct 2019 +nWitnessForkHeight = 680770 # targeting 11 Dec 2019 +nAllClaimsInMerkleForkHeight = 658310 # targeting 30 Oct 2019 +proportionalDelayFactor = 32 + +def get_expiration_height(last_updated_height: int) -> int: + if last_updated_height < nExtendedClaimExpirationForkHeight: + return last_updated_height + nOriginalClaimExpirationTime + return last_updated_height + nExtendedClaimExpirationTime + def length_encoded_name(name: str) -> bytes: encoded = name.encode('utf-8') @@ -79,6 +94,7 @@ class StagedClaimtrieItem(typing.NamedTuple): amount: int effective_amount: int activation_height: int + expiration_height: int tx_num: int position: int root_claim_tx_num: int @@ -123,6 +139,13 @@ class StagedClaimtrieItem(typing.NamedTuple): # claim hash by txo op( *Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.name) + ), + # claim expiration + op( + *Prefixes.claim_expiration.pack_item( + self.expiration_height, self.tx_num, self.position, self.claim_hash, + self.name + ) ) ] if self.signing_hash and self.claims_in_channel_count is not None: diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 3b1657af9..c69d4f3ac 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -123,6 +123,17 @@ class SupportToClaimValue(typing.NamedTuple): claim_hash: bytes +class ClaimExpirationKey(typing.NamedTuple): + expiration: int + tx_num: int + position: int + + +class ClaimExpirationValue(typing.NamedTuple): + claim_hash: bytes + name: str + + class EffectiveAmountPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_effective_amount_prefix.value key_struct = struct.Struct(b'>QLH') @@ -374,6 +385,39 @@ class SupportToClaimPrefixRow(PrefixRow): cls.pack_value(claim_hash) +class ClaimExpirationPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_expiration.value + key_struct = struct.Struct(b'>LLH') + value_struct = struct.Struct(b'>20s') + + @classmethod + def pack_key(cls, expiration: int, tx_num: int, position: int) -> bytes: + return super().pack_key(expiration, tx_num, position) + + @classmethod + def pack_value(cls, claim_hash: bytes, name: str) -> bytes: + return cls.value_struct.pack(claim_hash) + length_encoded_name(name) + + @classmethod + def pack_item(cls, expiration: int, tx_num: int, position: int, claim_hash: bytes, name: str) -> typing.Tuple[bytes, bytes]: + return cls.pack_key(expiration, tx_num, position), cls.pack_value(claim_hash, name) + + @classmethod + def unpack_key(cls, key: bytes) -> ClaimExpirationKey: + return ClaimExpirationKey(*super().unpack_key(key)) + + @classmethod + def unpack_value(cls, data: bytes) -> ClaimExpirationValue: + name_len = int.from_bytes(data[20:22], byteorder='big') + name = data[22:22 + name_len].decode() + claim_id, = cls.value_struct.unpack(data[:20]) + return ClaimExpirationValue(claim_id, name) + + @classmethod + def unpack_item(cls, key: bytes, value: bytes) -> typing.Tuple[ClaimExpirationKey, ClaimExpirationValue]: + return cls.unpack_key(key), cls.unpack_value(value) + + class Prefixes: claim_to_support = ClaimToSupportPrefixRow support_to_claim = SupportToClaimPrefixRow @@ -385,7 +429,7 @@ class Prefixes: channel_to_claim = ChannelToClaimPrefixRow claim_short_id = ClaimShortIDPrefixRow - claim_effective_amount = EffectiveAmountPrefixRow + claim_expiration = ClaimExpirationPrefixRow - undo_claimtrie = b'M' + # undo_claimtrie = b'M' diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index e5d18fbbf..456de2526 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -30,6 +30,7 @@ from lbry.utils import LRUCacheWithMetrics from lbry.schema.url import URL from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN +from lbry.wallet.server.tx import TxInput from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.storage import db_class @@ -37,6 +38,7 @@ from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, Re from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.prefixes import Prefixes from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name +from lbry.wallet.server.db.claimtrie import get_expiration_height UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") @@ -188,8 +190,8 @@ class LevelDB: created_height = bisect_right(self.tx_counts, root_tx_num) last_take_over_height = 0 activation_height = created_height - expiration_height = 0 + expiration_height = get_expiration_height(height) support_amount = self.get_support_amount(claim_hash) effective_amount = self.get_effective_amount(claim_hash) channel_hash = self.get_channel_for_claim(claim_hash) @@ -324,16 +326,17 @@ class LevelDB: root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount( claim_hash ) - activation_height = 0 + height = bisect_right(self.tx_counts, tx_num) effective_amount = self.db.get_support_amount(claim_hash) + value signing_hash = self.get_channel_for_claim(claim_hash) + activation_height = 0 if signing_hash: count = self.get_claims_in_channel_count(signing_hash) else: count = 0 return StagedClaimtrieItem( - name, claim_hash, value, effective_amount, activation_height, tx_num, idx, root_tx_num, root_idx, - signing_hash, count + name, claim_hash, value, effective_amount, activation_height, get_expiration_height(height), tx_num, idx, + root_tx_num, root_idx, signing_hash, count ) def get_effective_amount(self, claim_hash): @@ -365,6 +368,20 @@ class LevelDB: def get_channel_for_claim(self, claim_hash) -> Optional[bytes]: return self.db.get(DB_PREFIXES.claim_to_channel.value + claim_hash) + def get_expired_by_height(self, height: int): + expired = {} + for _k, _v in self.db.iterator(prefix=DB_PREFIXES.claim_expiration.value + struct.pack(b'>L', height)): + k, v = Prefixes.claim_expiration.unpack_item(_k, _v) + tx_hash = self.total_transactions[k.tx_num] + tx = self.coin.transaction(self.db.get(DB_PREFIXES.TX_PREFIX.value + tx_hash)) + # treat it like a claim spend so it will delete/abandon properly + # the _spend_claim function this result is fed to expects a txi, so make a mock one + expired[v.claim_hash] = ( + k.tx_num, k.position, v.name, + TxInput(prev_hash=tx_hash, prev_idx=k.position, script=tx.outputs[k.position].pk_script, sequence=0) + ) + return expired + # def add_unflushed(self, hashXs_by_tx, first_tx_num): # unflushed = self.history.unflushed # count = 0