claim expiration

This commit is contained in:
Jack Robison 2021-02-21 17:26:13 -05:00 committed by Victor Shyba
parent d57cd5acd7
commit 06841a4fde
6 changed files with 136 additions and 23 deletions

View file

@ -1,6 +1,7 @@
import time import time
import asyncio import asyncio
import typing import typing
from bisect import bisect_right
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple from typing import Optional, List, Tuple
@ -16,11 +17,11 @@ from lbry.wallet.server.util import chunks, class_logger
from lbry.crypto.hash import hash160 from lbry.crypto.hash import hash160
from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport, get_expiration_height
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.db.revertable import RevertableOp
class Prefetcher: class Prefetcher:
@ -207,6 +208,8 @@ class BlockProcessor:
self.pending_support_txos = {} self.pending_support_txos = {}
self.pending_abandon = set() self.pending_abandon = set()
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -460,7 +463,8 @@ class BlockProcessor:
self.history_cache.clear() self.history_cache.clear()
self.notifications.notified_mempool_txs.clear() self.notifications.notified_mempool_txs.clear()
def _add_claim_or_update(self, txo, script, tx_hash, idx, tx_count, txout, spent_claims): def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout,
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]) -> List['RevertableOp']:
try: try:
claim_name = txo.normalized_name claim_name = txo.normalized_name
except UnicodeDecodeError: except UnicodeDecodeError:
@ -506,10 +510,9 @@ class BlockProcessor:
root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount(
claim_hash claim_hash
) )
pending = StagedClaimtrieItem( pending = StagedClaimtrieItem(
claim_name, claim_hash, txout.value, support_amount + txout.value, claim_name, claim_hash, txout.value, support_amount + txout.value,
activation_height, tx_count, idx, root_tx_num, root_idx, activation_height, get_expiration_height(height), tx_count, idx, root_tx_num, root_idx,
signing_channel_hash, channel_claims_count signing_channel_hash, channel_claims_count
) )
@ -518,7 +521,7 @@ class BlockProcessor:
self.effective_amount_changes[claim_hash].append(txout.value) self.effective_amount_changes[claim_hash].append(txout.value)
return pending.get_add_claim_utxo_ops() return pending.get_add_claim_utxo_ops()
def _add_support(self, txo, txout, idx, tx_count): def _add_support(self, txo, txout, idx, tx_count) -> List['RevertableOp']:
supported_claim_hash = txo.claim_hash[::-1] supported_claim_hash = txo.claim_hash[::-1]
if supported_claim_hash in self.effective_amount_changes: if supported_claim_hash in self.effective_amount_changes:
@ -529,7 +532,6 @@ class BlockProcessor:
return StagedClaimtrieSupport( return StagedClaimtrieSupport(
supported_claim_hash, tx_count, idx, txout.value supported_claim_hash, tx_count, idx, txout.value
).get_add_support_utxo_ops() ).get_add_support_utxo_ops()
elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon: elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon:
if self.db.claim_exists(supported_claim_hash): if self.db.claim_exists(supported_claim_hash):
_, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount( _, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount(
@ -549,9 +551,10 @@ class BlockProcessor:
print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}") print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}")
return [] return []
def _add_claim_or_support(self, tx_hash, tx_count, idx, txo, txout, script, spent_claims): def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_count: int, idx: int, txo, txout, script,
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]) -> List['RevertableOp']:
if script.is_claim_name or script.is_update_claim: if script.is_claim_name or script.is_update_claim:
return self._add_claim_or_update(txo, script, tx_hash, idx, tx_count, txout, spent_claims) return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims)
elif script.is_support_claim or script.is_support_claim_data: elif script.is_support_claim or script.is_support_claim_data:
return self._add_support(txo, txout, idx, tx_count) return self._add_support(txo, txout, idx, tx_count)
return [] return []
@ -602,9 +605,10 @@ class BlockProcessor:
) )
claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash) claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash)
activation_height = 0 activation_height = 0
height = bisect_right(self.db.tx_counts, tx_num)
spent = StagedClaimtrieItem( spent = StagedClaimtrieItem(
name, prev_claim_hash, prev_amount, prev_effective_amount, name, prev_claim_hash, prev_amount, prev_effective_amount,
activation_height, txin_num, txin.prev_idx, claim_root_tx_num, activation_height, get_expiration_height(height), txin_num, txin.prev_idx, claim_root_tx_num,
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
) )
spent_claims[prev_claim_hash] = (txin_num, txin.prev_idx, name) spent_claims[prev_claim_hash] = (txin_num, txin.prev_idx, name)
@ -668,7 +672,9 @@ class BlockProcessor:
) )
# print(f"\tremove support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}") # print(f"\tremove support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}")
height = bisect_right(self.db.tx_counts, prev_tx_num)
activation_height = 0 activation_height = 0
if abandoned_claim_hash in self.effective_amount_changes: if abandoned_claim_hash in self.effective_amount_changes:
# print("pop") # print("pop")
self.effective_amount_changes.pop(abandoned_claim_hash) self.effective_amount_changes.pop(abandoned_claim_hash)
@ -678,9 +684,21 @@ class BlockProcessor:
ops.extend( ops.extend(
StagedClaimtrieItem( StagedClaimtrieItem(
name, abandoned_claim_hash, prev_amount, prev_effective_amount, name, abandoned_claim_hash, prev_amount, prev_effective_amount,
activation_height, prev_tx_num, prev_idx, claim_root_tx_num, activation_height, get_expiration_height(height), prev_tx_num, prev_idx, claim_root_tx_num,
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
).get_abandon_ops(self.db.db)) ).get_abandon_ops(self.db.db)
)
return ops
def _expire_claims(self, height: int):
expired = self.db.get_expired_by_height(height)
spent_claims = {}
ops = []
for expired_claim_hash, (tx_num, position, name, txi) in expired.items():
if (tx_num, position) not in self.pending_claims:
ops.extend(self._spend_claim(txi, spent_claims))
if expired:
ops.extend(self._abandon(spent_claims))
return ops return ops
def advance_block(self, block, height: int): def advance_block(self, block, height: int):
@ -708,7 +726,7 @@ class BlockProcessor:
append_hashX_by_tx = hashXs_by_tx.append append_hashX_by_tx = hashXs_by_tx.append
hashX_from_script = self.coin.hashX_from_script hashX_from_script = self.coin.hashX_from_script
unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()} # unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()}
for tx, tx_hash in txs: for tx, tx_hash in txs:
# print(f"{tx_hash[::-1].hex()} @ {height}") # print(f"{tx_hash[::-1].hex()} @ {height}")
@ -745,7 +763,7 @@ class BlockProcessor:
txo = Output(txout.value, script) txo = Output(txout.value, script)
claim_or_support_ops = self._add_claim_or_support( claim_or_support_ops = self._add_claim_or_support(
tx_hash, tx_count, idx, txo, txout, script, spent_claims height, tx_hash, tx_count, idx, txo, txout, script, spent_claims
) )
if claim_or_support_ops: if claim_or_support_ops:
claimtrie_stash_extend(claim_or_support_ops) claimtrie_stash_extend(claim_or_support_ops)
@ -761,6 +779,12 @@ class BlockProcessor:
self.db.transaction_num_mapping[tx_hash] = tx_count self.db.transaction_num_mapping[tx_hash] = tx_count
tx_count += 1 tx_count += 1
# handle expired claims
expired_ops = self._expire_claims(height)
if expired_ops:
print(f"************\nexpire claims at block {height}\n************")
claimtrie_stash_extend(expired_ops)
# self.db.add_unflushed(hashXs_by_tx, self.tx_count) # self.db.add_unflushed(hashXs_by_tx, self.tx_count)
_unflushed = self.db.hist_unflushed _unflushed = self.db.hist_unflushed
_count = 0 _count = 0

View file

@ -14,7 +14,6 @@ from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
# from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.block_processor import LBRYBlockProcessor
@ -214,6 +213,11 @@ class Coin:
txs = cls.DESERIALIZER(raw_block, start=len(header)).read_tx_block() txs = cls.DESERIALIZER(raw_block, start=len(header)).read_tx_block()
return Block(raw_block, header, txs) return Block(raw_block, header, txs)
@classmethod
def transaction(cls, raw_tx: bytes):
"""Return a Block namedtuple given a raw block and its height."""
return cls.DESERIALIZER(raw_tx).read_tx()
@classmethod @classmethod
def decimal_value(cls, value): def decimal_value(cls, value):
"""Return the number of standard coin units as a Decimal given a """Return the number of standard coin units as a Decimal given a

View file

@ -13,6 +13,7 @@ class DB_PREFIXES(enum.Enum):
claim_short_id_prefix = b'F' claim_short_id_prefix = b'F'
claim_effective_amount_prefix = b'D' claim_effective_amount_prefix = b'D'
claim_expiration = b'O'
undo_claimtrie = b'M' undo_claimtrie = b'M'

View file

@ -4,6 +4,21 @@ from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, Re
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes from lbry.wallet.server.db.prefixes import Prefixes
nOriginalClaimExpirationTime = 262974
nExtendedClaimExpirationTime = 2102400
nExtendedClaimExpirationForkHeight = 400155
nNormalizedNameForkHeight = 539940 # targeting 21 March 2019
nMinTakeoverWorkaroundHeight = 496850
nMaxTakeoverWorkaroundHeight = 658300 # targeting 30 Oct 2019
nWitnessForkHeight = 680770 # targeting 11 Dec 2019
nAllClaimsInMerkleForkHeight = 658310 # targeting 30 Oct 2019
proportionalDelayFactor = 32
def get_expiration_height(last_updated_height: int) -> int:
if last_updated_height < nExtendedClaimExpirationForkHeight:
return last_updated_height + nOriginalClaimExpirationTime
return last_updated_height + nExtendedClaimExpirationTime
def length_encoded_name(name: str) -> bytes: def length_encoded_name(name: str) -> bytes:
encoded = name.encode('utf-8') encoded = name.encode('utf-8')
@ -79,6 +94,7 @@ class StagedClaimtrieItem(typing.NamedTuple):
amount: int amount: int
effective_amount: int effective_amount: int
activation_height: int activation_height: int
expiration_height: int
tx_num: int tx_num: int
position: int position: int
root_claim_tx_num: int root_claim_tx_num: int
@ -123,6 +139,13 @@ class StagedClaimtrieItem(typing.NamedTuple):
# claim hash by txo # claim hash by txo
op( op(
*Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.name) *Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.name)
),
# claim expiration
op(
*Prefixes.claim_expiration.pack_item(
self.expiration_height, self.tx_num, self.position, self.claim_hash,
self.name
)
) )
] ]
if self.signing_hash and self.claims_in_channel_count is not None: if self.signing_hash and self.claims_in_channel_count is not None:

View file

@ -123,6 +123,17 @@ class SupportToClaimValue(typing.NamedTuple):
claim_hash: bytes claim_hash: bytes
class ClaimExpirationKey(typing.NamedTuple):
expiration: int
tx_num: int
position: int
class ClaimExpirationValue(typing.NamedTuple):
claim_hash: bytes
name: str
class EffectiveAmountPrefixRow(PrefixRow): class EffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_effective_amount_prefix.value prefix = DB_PREFIXES.claim_effective_amount_prefix.value
key_struct = struct.Struct(b'>QLH') key_struct = struct.Struct(b'>QLH')
@ -374,6 +385,39 @@ class SupportToClaimPrefixRow(PrefixRow):
cls.pack_value(claim_hash) cls.pack_value(claim_hash)
class ClaimExpirationPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_expiration.value
key_struct = struct.Struct(b'>LLH')
value_struct = struct.Struct(b'>20s')
@classmethod
def pack_key(cls, expiration: int, tx_num: int, position: int) -> bytes:
return super().pack_key(expiration, tx_num, position)
@classmethod
def pack_value(cls, claim_hash: bytes, name: str) -> bytes:
return cls.value_struct.pack(claim_hash) + length_encoded_name(name)
@classmethod
def pack_item(cls, expiration: int, tx_num: int, position: int, claim_hash: bytes, name: str) -> typing.Tuple[bytes, bytes]:
return cls.pack_key(expiration, tx_num, position), cls.pack_value(claim_hash, name)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimExpirationKey:
return ClaimExpirationKey(*super().unpack_key(key))
@classmethod
def unpack_value(cls, data: bytes) -> ClaimExpirationValue:
name_len = int.from_bytes(data[20:22], byteorder='big')
name = data[22:22 + name_len].decode()
claim_id, = cls.value_struct.unpack(data[:20])
return ClaimExpirationValue(claim_id, name)
@classmethod
def unpack_item(cls, key: bytes, value: bytes) -> typing.Tuple[ClaimExpirationKey, ClaimExpirationValue]:
return cls.unpack_key(key), cls.unpack_value(value)
class Prefixes: class Prefixes:
claim_to_support = ClaimToSupportPrefixRow claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow support_to_claim = SupportToClaimPrefixRow
@ -385,7 +429,7 @@ class Prefixes:
channel_to_claim = ChannelToClaimPrefixRow channel_to_claim = ChannelToClaimPrefixRow
claim_short_id = ClaimShortIDPrefixRow claim_short_id = ClaimShortIDPrefixRow
claim_effective_amount = EffectiveAmountPrefixRow claim_effective_amount = EffectiveAmountPrefixRow
claim_expiration = ClaimExpirationPrefixRow
undo_claimtrie = b'M' # undo_claimtrie = b'M'

View file

@ -30,6 +30,7 @@ from lbry.utils import LRUCacheWithMetrics
from lbry.schema.url import URL from lbry.schema.url import URL
from lbry.wallet.server import util from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN
from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
@ -37,6 +38,7 @@ from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, Re
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes from lbry.wallet.server.db.prefixes import Prefixes
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name
from lbry.wallet.server.db.claimtrie import get_expiration_height
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@ -188,8 +190,8 @@ class LevelDB:
created_height = bisect_right(self.tx_counts, root_tx_num) created_height = bisect_right(self.tx_counts, root_tx_num)
last_take_over_height = 0 last_take_over_height = 0
activation_height = created_height activation_height = created_height
expiration_height = 0
expiration_height = get_expiration_height(height)
support_amount = self.get_support_amount(claim_hash) support_amount = self.get_support_amount(claim_hash)
effective_amount = self.get_effective_amount(claim_hash) effective_amount = self.get_effective_amount(claim_hash)
channel_hash = self.get_channel_for_claim(claim_hash) channel_hash = self.get_channel_for_claim(claim_hash)
@ -324,16 +326,17 @@ class LevelDB:
root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount( root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount(
claim_hash claim_hash
) )
activation_height = 0 height = bisect_right(self.tx_counts, tx_num)
effective_amount = self.db.get_support_amount(claim_hash) + value effective_amount = self.db.get_support_amount(claim_hash) + value
signing_hash = self.get_channel_for_claim(claim_hash) signing_hash = self.get_channel_for_claim(claim_hash)
activation_height = 0
if signing_hash: if signing_hash:
count = self.get_claims_in_channel_count(signing_hash) count = self.get_claims_in_channel_count(signing_hash)
else: else:
count = 0 count = 0
return StagedClaimtrieItem( return StagedClaimtrieItem(
name, claim_hash, value, effective_amount, activation_height, tx_num, idx, root_tx_num, root_idx, name, claim_hash, value, effective_amount, activation_height, get_expiration_height(height), tx_num, idx,
signing_hash, count root_tx_num, root_idx, signing_hash, count
) )
def get_effective_amount(self, claim_hash): def get_effective_amount(self, claim_hash):
@ -365,6 +368,20 @@ class LevelDB:
def get_channel_for_claim(self, claim_hash) -> Optional[bytes]: def get_channel_for_claim(self, claim_hash) -> Optional[bytes]:
return self.db.get(DB_PREFIXES.claim_to_channel.value + claim_hash) return self.db.get(DB_PREFIXES.claim_to_channel.value + claim_hash)
def get_expired_by_height(self, height: int):
expired = {}
for _k, _v in self.db.iterator(prefix=DB_PREFIXES.claim_expiration.value + struct.pack(b'>L', height)):
k, v = Prefixes.claim_expiration.unpack_item(_k, _v)
tx_hash = self.total_transactions[k.tx_num]
tx = self.coin.transaction(self.db.get(DB_PREFIXES.TX_PREFIX.value + tx_hash))
# treat it like a claim spend so it will delete/abandon properly
# the _spend_claim function this result is fed to expects a txi, so make a mock one
expired[v.claim_hash] = (
k.tx_num, k.position, v.name,
TxInput(prev_hash=tx_hash, prev_idx=k.position, script=tx.outputs[k.position].pk_script, sequence=0)
)
return expired
# def add_unflushed(self, hashXs_by_tx, first_tx_num): # def add_unflushed(self, hashXs_by_tx, first_tx_num):
# unflushed = self.history.unflushed # unflushed = self.history.unflushed
# count = 0 # count = 0