claim expiration

This commit is contained in:
Jack Robison 2021-02-21 17:26:13 -05:00
parent 923834c784
commit c681041b48
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 136 additions and 23 deletions

View file

@ -1,6 +1,7 @@
import time
import asyncio
import typing
from bisect import bisect_right
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple
@ -16,11 +17,11 @@ from lbry.wallet.server.util import chunks, class_logger
from lbry.crypto.hash import hash160
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport, get_expiration_height
from lbry.wallet.server.udp import StatusServer
if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.db.revertable import RevertableOp
class Prefetcher:
@ -207,6 +208,8 @@ class BlockProcessor:
self.pending_support_txos = {}
self.pending_abandon = set()
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task
@ -460,7 +463,8 @@ class BlockProcessor:
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
def _add_claim_or_update(self, txo, script, tx_hash, idx, tx_count, txout, spent_claims):
def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout,
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]) -> List['RevertableOp']:
try:
claim_name = txo.normalized_name
except UnicodeDecodeError:
@ -506,10 +510,9 @@ class BlockProcessor:
root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount(
claim_hash
)
pending = StagedClaimtrieItem(
claim_name, claim_hash, txout.value, support_amount + txout.value,
activation_height, tx_count, idx, root_tx_num, root_idx,
activation_height, get_expiration_height(height), tx_count, idx, root_tx_num, root_idx,
signing_channel_hash, channel_claims_count
)
@ -518,7 +521,7 @@ class BlockProcessor:
self.effective_amount_changes[claim_hash].append(txout.value)
return pending.get_add_claim_utxo_ops()
def _add_support(self, txo, txout, idx, tx_count):
def _add_support(self, txo, txout, idx, tx_count) -> List['RevertableOp']:
supported_claim_hash = txo.claim_hash[::-1]
if supported_claim_hash in self.effective_amount_changes:
@ -529,7 +532,6 @@ class BlockProcessor:
return StagedClaimtrieSupport(
supported_claim_hash, tx_count, idx, txout.value
).get_add_support_utxo_ops()
elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon:
if self.db.claim_exists(supported_claim_hash):
_, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount(
@ -549,9 +551,10 @@ class BlockProcessor:
print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}")
return []
def _add_claim_or_support(self, tx_hash, tx_count, idx, txo, txout, script, spent_claims):
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_count: int, idx: int, txo, txout, script,
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]) -> List['RevertableOp']:
if script.is_claim_name or script.is_update_claim:
return self._add_claim_or_update(txo, script, tx_hash, idx, tx_count, txout, spent_claims)
return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims)
elif script.is_support_claim or script.is_support_claim_data:
return self._add_support(txo, txout, idx, tx_count)
return []
@ -602,9 +605,10 @@ class BlockProcessor:
)
claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash)
activation_height = 0
height = bisect_right(self.db.tx_counts, tx_num)
spent = StagedClaimtrieItem(
name, prev_claim_hash, prev_amount, prev_effective_amount,
activation_height, txin_num, txin.prev_idx, claim_root_tx_num,
activation_height, get_expiration_height(height), txin_num, txin.prev_idx, claim_root_tx_num,
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
)
spent_claims[prev_claim_hash] = (txin_num, txin.prev_idx, name)
@ -668,7 +672,9 @@ class BlockProcessor:
)
# print(f"\tremove support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}")
height = bisect_right(self.db.tx_counts, prev_tx_num)
activation_height = 0
if abandoned_claim_hash in self.effective_amount_changes:
# print("pop")
self.effective_amount_changes.pop(abandoned_claim_hash)
@ -677,10 +683,22 @@ class BlockProcessor:
# print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}")
ops.extend(
StagedClaimtrieItem(
name, abandoned_claim_hash, prev_amount, prev_effective_amount,
activation_height, prev_tx_num, prev_idx, claim_root_tx_num,
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
).get_abandon_ops(self.db.db))
name, abandoned_claim_hash, prev_amount, prev_effective_amount,
activation_height, get_expiration_height(height), prev_tx_num, prev_idx, claim_root_tx_num,
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
).get_abandon_ops(self.db.db)
)
return ops
def _expire_claims(self, height: int):
expired = self.db.get_expired_by_height(height)
spent_claims = {}
ops = []
for expired_claim_hash, (tx_num, position, name, txi) in expired.items():
if (tx_num, position) not in self.pending_claims:
ops.extend(self._spend_claim(txi, spent_claims))
if expired:
ops.extend(self._abandon(spent_claims))
return ops
def advance_block(self, block, height: int):
@ -708,7 +726,7 @@ class BlockProcessor:
append_hashX_by_tx = hashXs_by_tx.append
hashX_from_script = self.coin.hashX_from_script
unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()}
# unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()}
for tx, tx_hash in txs:
# print(f"{tx_hash[::-1].hex()} @ {height}")
@ -745,7 +763,7 @@ class BlockProcessor:
txo = Output(txout.value, script)
claim_or_support_ops = self._add_claim_or_support(
tx_hash, tx_count, idx, txo, txout, script, spent_claims
height, tx_hash, tx_count, idx, txo, txout, script, spent_claims
)
if claim_or_support_ops:
claimtrie_stash_extend(claim_or_support_ops)
@ -761,6 +779,12 @@ class BlockProcessor:
self.db.transaction_num_mapping[tx_hash] = tx_count
tx_count += 1
# handle expired claims
expired_ops = self._expire_claims(height)
if expired_ops:
print(f"************\nexpire claims at block {height}\n************")
claimtrie_stash_extend(expired_ops)
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
_unflushed = self.db.hist_unflushed
_count = 0

View file

@ -14,7 +14,6 @@ from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
# from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.block_processor import LBRYBlockProcessor
@ -214,6 +213,11 @@ class Coin:
txs = cls.DESERIALIZER(raw_block, start=len(header)).read_tx_block()
return Block(raw_block, header, txs)
@classmethod
def transaction(cls, raw_tx: bytes):
"""Return a Block namedtuple given a raw block and its height."""
return cls.DESERIALIZER(raw_tx).read_tx()
@classmethod
def decimal_value(cls, value):
"""Return the number of standard coin units as a Decimal given a

View file

@ -13,6 +13,7 @@ class DB_PREFIXES(enum.Enum):
claim_short_id_prefix = b'F'
claim_effective_amount_prefix = b'D'
claim_expiration = b'O'
undo_claimtrie = b'M'

View file

@ -4,6 +4,21 @@ from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, Re
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes
nOriginalClaimExpirationTime = 262974
nExtendedClaimExpirationTime = 2102400
nExtendedClaimExpirationForkHeight = 400155
nNormalizedNameForkHeight = 539940 # targeting 21 March 2019
nMinTakeoverWorkaroundHeight = 496850
nMaxTakeoverWorkaroundHeight = 658300 # targeting 30 Oct 2019
nWitnessForkHeight = 680770 # targeting 11 Dec 2019
nAllClaimsInMerkleForkHeight = 658310 # targeting 30 Oct 2019
proportionalDelayFactor = 32
def get_expiration_height(last_updated_height: int) -> int:
if last_updated_height < nExtendedClaimExpirationForkHeight:
return last_updated_height + nOriginalClaimExpirationTime
return last_updated_height + nExtendedClaimExpirationTime
def length_encoded_name(name: str) -> bytes:
encoded = name.encode('utf-8')
@ -79,6 +94,7 @@ class StagedClaimtrieItem(typing.NamedTuple):
amount: int
effective_amount: int
activation_height: int
expiration_height: int
tx_num: int
position: int
root_claim_tx_num: int
@ -123,6 +139,13 @@ class StagedClaimtrieItem(typing.NamedTuple):
# claim hash by txo
op(
*Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.name)
),
# claim expiration
op(
*Prefixes.claim_expiration.pack_item(
self.expiration_height, self.tx_num, self.position, self.claim_hash,
self.name
)
)
]
if self.signing_hash and self.claims_in_channel_count is not None:

View file

@ -123,6 +123,17 @@ class SupportToClaimValue(typing.NamedTuple):
claim_hash: bytes
class ClaimExpirationKey(typing.NamedTuple):
expiration: int
tx_num: int
position: int
class ClaimExpirationValue(typing.NamedTuple):
claim_hash: bytes
name: str
class EffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_effective_amount_prefix.value
key_struct = struct.Struct(b'>QLH')
@ -374,6 +385,39 @@ class SupportToClaimPrefixRow(PrefixRow):
cls.pack_value(claim_hash)
class ClaimExpirationPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_expiration.value
key_struct = struct.Struct(b'>LLH')
value_struct = struct.Struct(b'>20s')
@classmethod
def pack_key(cls, expiration: int, tx_num: int, position: int) -> bytes:
return super().pack_key(expiration, tx_num, position)
@classmethod
def pack_value(cls, claim_hash: bytes, name: str) -> bytes:
return cls.value_struct.pack(claim_hash) + length_encoded_name(name)
@classmethod
def pack_item(cls, expiration: int, tx_num: int, position: int, claim_hash: bytes, name: str) -> typing.Tuple[bytes, bytes]:
return cls.pack_key(expiration, tx_num, position), cls.pack_value(claim_hash, name)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimExpirationKey:
return ClaimExpirationKey(*super().unpack_key(key))
@classmethod
def unpack_value(cls, data: bytes) -> ClaimExpirationValue:
name_len = int.from_bytes(data[20:22], byteorder='big')
name = data[22:22 + name_len].decode()
claim_id, = cls.value_struct.unpack(data[:20])
return ClaimExpirationValue(claim_id, name)
@classmethod
def unpack_item(cls, key: bytes, value: bytes) -> typing.Tuple[ClaimExpirationKey, ClaimExpirationValue]:
return cls.unpack_key(key), cls.unpack_value(value)
class Prefixes:
claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow
@ -385,7 +429,7 @@ class Prefixes:
channel_to_claim = ChannelToClaimPrefixRow
claim_short_id = ClaimShortIDPrefixRow
claim_effective_amount = EffectiveAmountPrefixRow
claim_expiration = ClaimExpirationPrefixRow
undo_claimtrie = b'M'
# undo_claimtrie = b'M'

View file

@ -30,6 +30,7 @@ from lbry.utils import LRUCacheWithMetrics
from lbry.schema.url import URL
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN
from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class
@ -37,6 +38,7 @@ from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, Re
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name
from lbry.wallet.server.db.claimtrie import get_expiration_height
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@ -188,8 +190,8 @@ class LevelDB:
created_height = bisect_right(self.tx_counts, root_tx_num)
last_take_over_height = 0
activation_height = created_height
expiration_height = 0
expiration_height = get_expiration_height(height)
support_amount = self.get_support_amount(claim_hash)
effective_amount = self.get_effective_amount(claim_hash)
channel_hash = self.get_channel_for_claim(claim_hash)
@ -324,16 +326,17 @@ class LevelDB:
root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount(
claim_hash
)
activation_height = 0
height = bisect_right(self.tx_counts, tx_num)
effective_amount = self.db.get_support_amount(claim_hash) + value
signing_hash = self.get_channel_for_claim(claim_hash)
activation_height = 0
if signing_hash:
count = self.get_claims_in_channel_count(signing_hash)
else:
count = 0
return StagedClaimtrieItem(
name, claim_hash, value, effective_amount, activation_height, tx_num, idx, root_tx_num, root_idx,
signing_hash, count
name, claim_hash, value, effective_amount, activation_height, get_expiration_height(height), tx_num, idx,
root_tx_num, root_idx, signing_hash, count
)
def get_effective_amount(self, claim_hash):
@ -365,6 +368,20 @@ class LevelDB:
def get_channel_for_claim(self, claim_hash) -> Optional[bytes]:
return self.db.get(DB_PREFIXES.claim_to_channel.value + claim_hash)
def get_expired_by_height(self, height: int):
expired = {}
for _k, _v in self.db.iterator(prefix=DB_PREFIXES.claim_expiration.value + struct.pack(b'>L', height)):
k, v = Prefixes.claim_expiration.unpack_item(_k, _v)
tx_hash = self.total_transactions[k.tx_num]
tx = self.coin.transaction(self.db.get(DB_PREFIXES.TX_PREFIX.value + tx_hash))
# treat it like a claim spend so it will delete/abandon properly
# the _spend_claim function this result is fed to expects a txi, so make a mock one
expired[v.claim_hash] = (
k.tx_num, k.position, v.name,
TxInput(prev_hash=tx_hash, prev_idx=k.position, script=tx.outputs[k.position].pk_script, sequence=0)
)
return expired
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
# unflushed = self.history.unflushed
# count = 0