claim activations and takeovers (WIP)

This commit is contained in:
Jack Robison 2021-05-05 16:17:32 -04:00
parent aa3b18f848
commit 9a11ac06bf
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 572 additions and 124 deletions

View file

@ -174,7 +174,6 @@ class Outputs:
@classmethod @classmethod
def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censor = None) -> bytes: def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censor = None) -> bytes:
extra_txo_rows = {row['claim_hash']: row for row in extra_txo_rows}
page = OutputsMessage() page = OutputsMessage()
page.offset = offset page.offset = offset
if total is not None: if total is not None:
@ -221,7 +220,7 @@ class Outputs:
if resolve_result.canonical_url is not None: if resolve_result.canonical_url is not None:
txo_message.claim.canonical_url = resolve_result.canonical_url txo_message.claim.canonical_url = resolve_result.canonical_url
if resolve_result.last_take_over_height is not None: if resolve_result.last_takeover_height is not None:
txo_message.claim.take_over_height = resolve_result.last_take_over_height txo_message.claim.take_over_height = resolve_result.last_takeover_height
if resolve_result.claims_in_channel is not None: if resolve_result.claims_in_channel is not None:
txo_message.claim.claims_in_channel = resolve_result.claims_in_channel txo_message.claim.claims_in_channel = resolve_result.claims_in_channel

View file

@ -18,6 +18,9 @@ from lbry.crypto.hash import hash160
from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport, get_expiration_height from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport, get_expiration_height
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, get_force_activate_ops, get_delay_for_name
from lbry.wallet.server.db.prefixes import PendingClaimActivationPrefixRow, Prefixes
from lbry.wallet.server.db.revertable import RevertablePut
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
@ -202,13 +205,12 @@ class BlockProcessor:
self.history_cache = {} self.history_cache = {}
self.status_server = StatusServer() self.status_server = StatusServer()
self.effective_amount_changes = defaultdict(list) self.effective_amount_changes = defaultdict(list)
self.pending_claims = {} self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {}
self.pending_claim_txos = {} self.pending_claim_txos: typing.Dict[bytes, Tuple[int, int]] = {}
self.pending_supports = defaultdict(set) self.pending_supports = defaultdict(set)
self.pending_support_txos = {} self.pending_support_txos = {}
self.pending_abandon = set() self.pending_abandon = set()
self.staged_pending_abandoned = {}
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
@ -239,7 +241,6 @@ class BlockProcessor:
try: try:
for block in blocks: for block in blocks:
await self.run_in_thread_with_lock(self.advance_block, block) await self.run_in_thread_with_lock(self.advance_block, block)
print("advanced\n")
except: except:
self.logger.exception("advance blocks failed") self.logger.exception("advance blocks failed")
raise raise
@ -412,7 +413,8 @@ class BlockProcessor:
return None return None
def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout, def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout,
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]) -> List['RevertableOp']: spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]],
zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']:
try: try:
claim_name = txo.normalized_name claim_name = txo.normalized_name
except UnicodeDecodeError: except UnicodeDecodeError:
@ -425,7 +427,13 @@ class BlockProcessor:
signing_channel_hash = None signing_channel_hash = None
channel_claims_count = 0 channel_claims_count = 0
activation_height = 0 activation_delay = self.db.get_activation_delay(claim_hash, claim_name)
if activation_delay == 0:
zero_delay_claims[(claim_name, claim_hash)] = tx_count, idx
# else:
# print("delay activation ", claim_name, activation_delay, height)
activation_height = activation_delay + height
try: try:
signable = txo.signable signable = txo.signable
except: # google.protobuf.message.DecodeError: Could not parse JSON. except: # google.protobuf.message.DecodeError: Could not parse JSON.
@ -455,9 +463,13 @@ class BlockProcessor:
root_idx = previous_claim.root_claim_tx_position root_idx = previous_claim.root_claim_tx_position
# prev_amount = previous_claim.amount # prev_amount = previous_claim.amount
else: else:
root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( k, v = self.db.get_root_claim_txo_and_current_amount(
claim_hash claim_hash
) )
root_tx_num = v.root_tx_num
root_idx = v.root_position
prev_amount = v.amount
pending = StagedClaimtrieItem( pending = StagedClaimtrieItem(
claim_name, claim_hash, txout.value, support_amount + txout.value, claim_name, claim_hash, txout.value, support_amount + txout.value,
activation_height, get_expiration_height(height), tx_count, idx, root_tx_num, root_idx, activation_height, get_expiration_height(height), tx_count, idx, root_tx_num, root_idx,
@ -469,11 +481,25 @@ class BlockProcessor:
self.effective_amount_changes[claim_hash].append(txout.value) self.effective_amount_changes[claim_hash].append(txout.value)
return pending.get_add_claim_utxo_ops() return pending.get_add_claim_utxo_ops()
def _add_support(self, txo, txout, idx, tx_count) -> List['RevertableOp']: def _add_support(self, height, txo, txout, idx, tx_count,
zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']:
supported_claim_hash = txo.claim_hash[::-1] supported_claim_hash = txo.claim_hash[::-1]
claim_info = self.db.get_root_claim_txo_and_current_amount(
supported_claim_hash
)
controlling_claim = None
supported_tx_num = supported_position = supported_activation_height = supported_name = None
if claim_info:
k, v = claim_info
supported_name = v.name
supported_tx_num = k.tx_num
supported_position = k.position
supported_activation_height = v.activation
controlling_claim = self.db.get_controlling_claim(v.name)
if supported_claim_hash in self.effective_amount_changes: if supported_claim_hash in self.effective_amount_changes:
# print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}") # print(f"\tsupport claim {supported_claim_hash.hex()} {txout.value}")
self.effective_amount_changes[supported_claim_hash].append(txout.value) self.effective_amount_changes[supported_claim_hash].append(txout.value)
self.pending_supports[supported_claim_hash].add((tx_count, idx)) self.pending_supports[supported_claim_hash].add((tx_count, idx))
self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value
@ -481,42 +507,84 @@ class BlockProcessor:
supported_claim_hash, tx_count, idx, txout.value supported_claim_hash, tx_count, idx, txout.value
).get_add_support_utxo_ops() ).get_add_support_utxo_ops()
elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon: elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon:
if self.db.claim_exists(supported_claim_hash): # print(f"\tsupport claim {supported_claim_hash.hex()} {txout.value}")
_, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount( ops = []
supported_claim_hash if claim_info:
)
starting_amount = self.db.get_effective_amount(supported_claim_hash) starting_amount = self.db.get_effective_amount(supported_claim_hash)
if supported_claim_hash not in self.effective_amount_changes: if supported_claim_hash not in self.effective_amount_changes:
self.effective_amount_changes[supported_claim_hash].append(starting_amount) self.effective_amount_changes[supported_claim_hash].append(starting_amount)
self.effective_amount_changes[supported_claim_hash].append(txout.value) self.effective_amount_changes[supported_claim_hash].append(txout.value)
supported_amount = self._get_pending_effective_amount(supported_claim_hash)
if controlling_claim and supported_claim_hash != controlling_claim.claim_hash:
if supported_amount + txo.amount > self._get_pending_effective_amount(controlling_claim.claim_hash):
# takeover could happen
if (supported_name, supported_claim_hash) not in zero_delay_claims:
takeover_delay = get_delay_for_name(height - supported_activation_height)
if takeover_delay == 0:
zero_delay_claims[(supported_name, supported_claim_hash)] = (
supported_tx_num, supported_position
)
else:
ops.append(
RevertablePut(
*Prefixes.pending_activation.pack_item(
height + takeover_delay, supported_tx_num, supported_position,
supported_claim_hash, supported_name
)
)
)
self.pending_supports[supported_claim_hash].add((tx_count, idx)) self.pending_supports[supported_claim_hash].add((tx_count, idx))
self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value
# print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}") # print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}")
return StagedClaimtrieSupport( ops.extend(StagedClaimtrieSupport(
supported_claim_hash, tx_count, idx, txout.value supported_claim_hash, tx_count, idx, txout.value
).get_add_support_utxo_ops() ).get_add_support_utxo_ops())
return ops
else: else:
print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}") print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}")
return [] return []
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_count: int, idx: int, txo, txout, script, def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_count: int, idx: int, txo, txout, script,
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]) -> List['RevertableOp']: spent_claims: typing.Dict[bytes, Tuple[int, int, str]],
zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']:
if script.is_claim_name or script.is_update_claim: if script.is_claim_name or script.is_update_claim:
return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims) return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims,
zero_delay_claims)
elif script.is_support_claim or script.is_support_claim_data: elif script.is_support_claim or script.is_support_claim_data:
return self._add_support(txo, txout, idx, tx_count) return self._add_support(height, txo, txout, idx, tx_count, zero_delay_claims)
return [] return []
def _spend_support(self, txin): def _remove_support(self, txin, zero_delay_claims):
txin_num = self.db.transaction_num_mapping[txin.prev_hash] txin_num = self.db.transaction_num_mapping[txin.prev_hash]
supported_name = None
if (txin_num, txin.prev_idx) in self.pending_support_txos: if (txin_num, txin.prev_idx) in self.pending_support_txos:
spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx)) spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx))
supported_name = self._get_pending_claim_name(spent_support)
self.pending_supports[spent_support].remove((txin_num, txin.prev_idx)) self.pending_supports[spent_support].remove((txin_num, txin.prev_idx))
else: else:
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
if spent_support:
supported_name = self._get_pending_claim_name(spent_support)
if spent_support and support_amount is not None and spent_support not in self.pending_abandon: if spent_support and support_amount is not None and spent_support not in self.pending_abandon:
# print(f"\tspent support for {spent_support.hex()} -{support_amount} ({txin_num}, {txin.prev_idx})") controlling = self.db.get_controlling_claim(supported_name)
if controlling:
bid_queue = {
claim_hash: self._get_pending_effective_amount(claim_hash)
for claim_hash in self.db.get_claims_for_name(supported_name)
if claim_hash not in self.pending_abandon
}
bid_queue[spent_support] -= support_amount
sorted_claims = sorted(
list(bid_queue.keys()), key=lambda claim_hash: bid_queue[claim_hash], reverse=True
)
if controlling.claim_hash == spent_support and sorted_claims.index(controlling.claim_hash) > 0:
print("takeover due to abandoned support")
# print(f"\tspent support for {spent_support.hex()} -{support_amount} ({txin_num}, {txin.prev_idx}) {supported_name}")
if spent_support not in self.effective_amount_changes: if spent_support not in self.effective_amount_changes:
assert spent_support not in self.pending_claims assert spent_support not in self.pending_claims
prev_effective_amount = self.db.get_effective_amount(spent_support) prev_effective_amount = self.db.get_effective_amount(spent_support)
@ -527,7 +595,7 @@ class BlockProcessor:
).get_spend_support_txo_ops() ).get_spend_support_txo_ops()
return [] return []
def _spend_claim(self, txin, spent_claims): def _remove_claim(self, txin, spent_claims, zero_delay_claims):
txin_num = self.db.transaction_num_mapping[txin.prev_hash] txin_num = self.db.transaction_num_mapping[txin.prev_hash]
if (txin_num, txin.prev_idx) in self.pending_claims: if (txin_num, txin.prev_idx) in self.pending_claims:
spent = self.pending_claims[(txin_num, txin.prev_idx)] spent = self.pending_claims[(txin_num, txin.prev_idx)]
@ -540,7 +608,7 @@ class BlockProcessor:
) )
if not spent_claim_hash_and_name: # txo is not a claim if not spent_claim_hash_and_name: # txo is not a claim
return [] return []
prev_claim_hash, txi_len_encoded_name = spent_claim_hash_and_name prev_claim_hash = spent_claim_hash_and_name.claim_hash
prev_signing_hash = self.db.get_channel_for_claim(prev_claim_hash) prev_signing_hash = self.db.get_channel_for_claim(prev_claim_hash)
prev_claims_in_channel_count = None prev_claims_in_channel_count = None
@ -551,8 +619,14 @@ class BlockProcessor:
prev_effective_amount = self.db.get_effective_amount( prev_effective_amount = self.db.get_effective_amount(
prev_claim_hash prev_claim_hash
) )
claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash) k, v = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash)
activation_height = 0 claim_root_tx_num = v.root_tx_num
claim_root_idx = v.root_position
prev_amount = v.amount
name = v.name
tx_num = k.tx_num
position = k.position
activation_height = v.activation
height = bisect_right(self.db.tx_counts, tx_num) height = bisect_right(self.db.tx_counts, tx_num)
spent = StagedClaimtrieItem( spent = StagedClaimtrieItem(
name, prev_claim_hash, prev_amount, prev_effective_amount, name, prev_claim_hash, prev_amount, prev_effective_amount,
@ -564,23 +638,29 @@ class BlockProcessor:
if spent.claim_hash not in self.effective_amount_changes: if spent.claim_hash not in self.effective_amount_changes:
self.effective_amount_changes[spent.claim_hash].append(spent.effective_amount) self.effective_amount_changes[spent.claim_hash].append(spent.effective_amount)
self.effective_amount_changes[spent.claim_hash].append(-spent.amount) self.effective_amount_changes[spent.claim_hash].append(-spent.amount)
if (name, spent.claim_hash) in zero_delay_claims:
zero_delay_claims.pop((name, spent.claim_hash))
return spent.get_spend_claim_txo_ops() return spent.get_spend_claim_txo_ops()
def _spend_claim_or_support(self, txin, spent_claims): def _remove_claim_or_support(self, txin, spent_claims, zero_delay_claims):
spend_claim_ops = self._spend_claim(txin, spent_claims) spend_claim_ops = self._remove_claim(txin, spent_claims, zero_delay_claims)
if spend_claim_ops: if spend_claim_ops:
return spend_claim_ops return spend_claim_ops
return self._spend_support(txin) return self._remove_support(txin, zero_delay_claims)
def _abandon(self, spent_claims): def _abandon(self, spent_claims) -> typing.Tuple[List['RevertableOp'], typing.Set[str]]:
# Handle abandoned claims # Handle abandoned claims
ops = [] ops = []
controlling_claims = {}
need_takeover = set()
for abandoned_claim_hash, (prev_tx_num, prev_idx, name) in spent_claims.items(): for abandoned_claim_hash, (prev_tx_num, prev_idx, name) in spent_claims.items():
# print(f"\tabandon lbry://{name}#{abandoned_claim_hash.hex()} {prev_tx_num} {prev_idx}") # print(f"\tabandon lbry://{name}#{abandoned_claim_hash.hex()} {prev_tx_num} {prev_idx}")
if (prev_tx_num, prev_idx) in self.pending_claims: if (prev_tx_num, prev_idx) in self.pending_claims:
pending = self.pending_claims.pop((prev_tx_num, prev_idx)) pending = self.pending_claims.pop((prev_tx_num, prev_idx))
self.staged_pending_abandoned[pending.claim_hash] = pending
claim_root_tx_num = pending.root_claim_tx_num claim_root_tx_num = pending.root_claim_tx_num
claim_root_idx = pending.root_claim_tx_position claim_root_idx = pending.root_claim_tx_position
prev_amount = pending.amount prev_amount = pending.amount
@ -588,9 +668,12 @@ class BlockProcessor:
prev_effective_amount = pending.effective_amount prev_effective_amount = pending.effective_amount
prev_claims_in_channel_count = pending.claims_in_channel_count prev_claims_in_channel_count = pending.claims_in_channel_count
else: else:
claim_root_tx_num, claim_root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( k, v = self.db.get_root_claim_txo_and_current_amount(
abandoned_claim_hash abandoned_claim_hash
) )
claim_root_tx_num = v.root_tx_num
claim_root_idx = v.root_position
prev_amount = v.amount
prev_signing_hash = self.db.get_channel_for_claim(abandoned_claim_hash) prev_signing_hash = self.db.get_channel_for_claim(abandoned_claim_hash)
prev_claims_in_channel_count = None prev_claims_in_channel_count = None
if prev_signing_hash: if prev_signing_hash:
@ -601,6 +684,13 @@ class BlockProcessor:
abandoned_claim_hash abandoned_claim_hash
) )
if name not in controlling_claims:
controlling_claims[name] = self.db.get_controlling_claim(name)
controlling = controlling_claims[name]
if controlling and controlling.claim_hash == abandoned_claim_hash:
need_takeover.add(name)
# print("needs takeover")
for (support_tx_num, support_tx_idx) in self.pending_supports[abandoned_claim_hash]: for (support_tx_num, support_tx_idx) in self.pending_supports[abandoned_claim_hash]:
_, support_amount = self.pending_support_txos.pop((support_tx_num, support_tx_idx)) _, support_amount = self.pending_support_txos.pop((support_tx_num, support_tx_idx))
ops.extend( ops.extend(
@ -628,7 +718,7 @@ class BlockProcessor:
self.effective_amount_changes.pop(abandoned_claim_hash) self.effective_amount_changes.pop(abandoned_claim_hash)
self.pending_abandon.add(abandoned_claim_hash) self.pending_abandon.add(abandoned_claim_hash)
# print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}") # print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}, {len(need_takeover)} names need takeovers")
ops.extend( ops.extend(
StagedClaimtrieItem( StagedClaimtrieItem(
name, abandoned_claim_hash, prev_amount, prev_effective_amount, name, abandoned_claim_hash, prev_amount, prev_effective_amount,
@ -636,20 +726,120 @@ class BlockProcessor:
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
).get_abandon_ops(self.db.db) ).get_abandon_ops(self.db.db)
) )
return ops return ops, need_takeover
def _expire_claims(self, height: int): def _expire_claims(self, height: int, zero_delay_claims):
expired = self.db.get_expired_by_height(height) expired = self.db.get_expired_by_height(height)
spent_claims = {} spent_claims = {}
ops = [] ops = []
names_needing_takeover = set()
for expired_claim_hash, (tx_num, position, name, txi) in expired.items(): for expired_claim_hash, (tx_num, position, name, txi) in expired.items():
if (tx_num, position) not in self.pending_claims: if (tx_num, position) not in self.pending_claims:
ops.extend(self._spend_claim(txi, spent_claims)) ops.extend(self._remove_claim(txi, spent_claims, zero_delay_claims))
if expired: if expired:
# do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned
abandon_ops, _names_needing_takeover = self._abandon(spent_claims)
if abandon_ops:
ops.extend(abandon_ops)
names_needing_takeover.update(_names_needing_takeover)
ops.extend(self._abandon(spent_claims)) ops.extend(self._abandon(spent_claims))
return ops, names_needing_takeover
def _get_pending_claim_amount(self, claim_hash: bytes) -> int:
if claim_hash in self.pending_claim_txos:
return self.pending_claims[self.pending_claim_txos[claim_hash]].amount
return self.db.get_claim_amount(claim_hash)
def _get_pending_claim_name(self, claim_hash: bytes) -> str:
assert claim_hash is not None
if claim_hash in self.pending_claims:
return self.pending_claims[claim_hash].name
claim = self.db.get_claim_from_txo(claim_hash)
return claim.name
def _get_pending_effective_amount(self, claim_hash: bytes) -> int:
claim_amount = self._get_pending_claim_amount(claim_hash) or 0
support_amount = self.db.get_support_amount(claim_hash) or 0
return claim_amount + support_amount + sum(
self.pending_support_txos[support_txnum, support_n][1]
for (support_txnum, support_n) in self.pending_supports.get(claim_hash, [])
) # TODO: subtract pending spend supports
def _get_name_takeover_ops(self, height: int, name: str,
activated_claims: typing.Set[bytes]) -> List['RevertableOp']:
controlling = self.db.get_controlling_claim(name)
if not controlling or controlling.claim_hash in self.pending_abandon:
# print("no controlling claim for ", name)
bid_queue = {
claim_hash: self._get_pending_effective_amount(claim_hash) for claim_hash in activated_claims
}
winning_claim = max(bid_queue, key=lambda k: bid_queue[k])
if winning_claim in self.pending_claim_txos:
s = self.pending_claims[self.pending_claim_txos[winning_claim]]
else:
s = self.db.make_staged_claim_item(winning_claim)
ops = []
if s.activation_height > height:
ops.extend(get_force_activate_ops(
name, s.tx_num, s.position, s.claim_hash, s.root_claim_tx_num, s.root_claim_tx_position,
s.amount, s.effective_amount, s.activation_height, height
))
ops.extend(get_takeover_name_ops(name, winning_claim, height))
return ops
else:
# print(f"current controlling claim for {name}#{controlling.claim_hash.hex()}")
controlling_effective_amount = self._get_pending_effective_amount(controlling.claim_hash)
bid_queue = {
claim_hash: self._get_pending_effective_amount(claim_hash) for claim_hash in activated_claims
}
highest_newly_activated = max(bid_queue, key=lambda k: bid_queue[k])
if bid_queue[highest_newly_activated] > controlling_effective_amount:
# print(f"takeover controlling claim for {name}#{controlling.claim_hash.hex()}")
return get_takeover_name_ops(name, highest_newly_activated, height, controlling)
print(bid_queue[highest_newly_activated], controlling_effective_amount)
# print("no takeover")
return []
def _get_takeover_ops(self, height: int, zero_delay_claims) -> List['RevertableOp']:
ops = []
pending = defaultdict(set)
# get non delayed takeovers for new names
for (name, claim_hash) in zero_delay_claims:
if claim_hash not in self.pending_abandon:
pending[name].add(claim_hash)
# print("zero delay activate", name, claim_hash.hex())
# get takeovers from claims activated at this block
for activated in self.db.get_activated_claims_at_height(height):
if activated.claim_hash not in self.pending_abandon:
pending[activated.name].add(activated.claim_hash)
# print("delayed activate")
# get takeovers from supports for controlling claims being abandoned
for abandoned_claim_hash in self.pending_abandon:
if abandoned_claim_hash in self.staged_pending_abandoned:
abandoned = self.staged_pending_abandoned[abandoned_claim_hash]
controlling = self.db.get_controlling_claim(abandoned.name)
if controlling and controlling.claim_hash == abandoned_claim_hash and abandoned.name not in pending:
pending[abandoned.name].update(self.db.get_claims_for_name(abandoned.name))
else:
k, v = self.db.get_root_claim_txo_and_current_amount(abandoned_claim_hash)
controlling_claim = self.db.get_controlling_claim(v.name)
if controlling_claim and abandoned_claim_hash == controlling_claim.claim_hash and v.name not in pending:
pending[v.name].update(self.db.get_claims_for_name(v.name))
# print("check abandoned winning")
# get takeovers from controlling claims being abandoned
for name, activated_claims in pending.items():
ops.extend(self._get_name_takeover_ops(height, name, activated_claims))
return ops return ops
def advance_block(self, block): def advance_block(self, block):
# print("advance ", height)
height = self.height + 1 height = self.height + 1
txs: List[Tuple[Tx, bytes]] = block.transactions txs: List[Tuple[Tx, bytes]] = block.transactions
block_hash = self.coin.header_hash(block.header) block_hash = self.coin.header_hash(block.header)
@ -672,7 +862,8 @@ class BlockProcessor:
append_hashX_by_tx = hashXs_by_tx.append append_hashX_by_tx = hashXs_by_tx.append
hashX_from_script = self.coin.hashX_from_script hashX_from_script = self.coin.hashX_from_script
# unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()} zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]] = {}
abandoned_or_expired_controlling = set()
for tx, tx_hash in txs: for tx, tx_hash in txs:
spent_claims = {} spent_claims = {}
@ -690,7 +881,7 @@ class BlockProcessor:
undo_info_append(cache_value) undo_info_append(cache_value)
append_hashX(cache_value[:-12]) append_hashX(cache_value[:-12])
spend_claim_or_support_ops = self._spend_claim_or_support(txin, spent_claims) spend_claim_or_support_ops = self._remove_claim_or_support(txin, spent_claims, zero_delay_claims)
if spend_claim_or_support_ops: if spend_claim_or_support_ops:
claimtrie_stash_extend(spend_claim_or_support_ops) claimtrie_stash_extend(spend_claim_or_support_ops)
@ -708,15 +899,16 @@ class BlockProcessor:
txo = Output(txout.value, script) txo = Output(txout.value, script)
claim_or_support_ops = self._add_claim_or_support( claim_or_support_ops = self._add_claim_or_support(
height, tx_hash, tx_count, idx, txo, txout, script, spent_claims height, tx_hash, tx_count, idx, txo, txout, script, spent_claims, zero_delay_claims
) )
if claim_or_support_ops: if claim_or_support_ops:
claimtrie_stash_extend(claim_or_support_ops) claimtrie_stash_extend(claim_or_support_ops)
# Handle abandoned claims # Handle abandoned claims
abandon_ops = self._abandon(spent_claims) abandon_ops, abandoned_controlling_need_takeover = self._abandon(spent_claims)
if abandon_ops: if abandon_ops:
claimtrie_stash_extend(abandon_ops) claimtrie_stash_extend(abandon_ops)
abandoned_or_expired_controlling.update(abandoned_controlling_need_takeover)
append_hashX_by_tx(hashXs) append_hashX_by_tx(hashXs)
update_touched(hashXs) update_touched(hashXs)
@ -725,11 +917,17 @@ class BlockProcessor:
tx_count += 1 tx_count += 1
# handle expired claims # handle expired claims
expired_ops = self._expire_claims(height) expired_ops, expired_need_takeover = self._expire_claims(height, zero_delay_claims)
if expired_ops: if expired_ops:
print(f"************\nexpire claims at block {height}\n************") # print(f"************\nexpire claims at block {height}\n************")
abandoned_or_expired_controlling.update(expired_need_takeover)
claimtrie_stash_extend(expired_ops) claimtrie_stash_extend(expired_ops)
# activate claims and process takeovers
takeover_ops = self._get_takeover_ops(height, zero_delay_claims)
if takeover_ops:
claimtrie_stash_extend(takeover_ops)
# self.db.add_unflushed(hashXs_by_tx, self.tx_count) # self.db.add_unflushed(hashXs_by_tx, self.tx_count)
_unflushed = self.db.hist_unflushed _unflushed = self.db.hist_unflushed
_count = 0 _count = 0
@ -789,7 +987,6 @@ class BlockProcessor:
coin = self.coin coin = self.coin
for raw_block in raw_blocks: for raw_block in raw_blocks:
self.logger.info("backup block %i", self.height) self.logger.info("backup block %i", self.height)
print("backup", self.height)
# Check and update self.tip # Check and update self.tip
block = coin.block(raw_block, self.height) block = coin.block(raw_block, self.height)
header_hash = coin.header_hash(block.header) header_hash = coin.header_hash(block.header)

View file

@ -15,6 +15,9 @@ class DB_PREFIXES(enum.Enum):
claim_effective_amount_prefix = b'D' claim_effective_amount_prefix = b'D'
claim_expiration = b'O' claim_expiration = b'O'
claim_takeover = b'P'
pending_activation = b'Q'
undo_claimtrie = b'M' undo_claimtrie = b'M'
HISTORY_PREFIX = b'A' HISTORY_PREFIX = b'A'

View file

@ -2,7 +2,7 @@ import typing
from typing import Optional from typing import Optional
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue
nOriginalClaimExpirationTime = 262974 nOriginalClaimExpirationTime = 262974
nExtendedClaimExpirationTime = 2102400 nExtendedClaimExpirationTime = 2102400
@ -13,6 +13,12 @@ nMaxTakeoverWorkaroundHeight = 658300 # targeting 30 Oct 2019
nWitnessForkHeight = 680770 # targeting 11 Dec 2019 nWitnessForkHeight = 680770 # targeting 11 Dec 2019
nAllClaimsInMerkleForkHeight = 658310 # targeting 30 Oct 2019 nAllClaimsInMerkleForkHeight = 658310 # targeting 30 Oct 2019
proportionalDelayFactor = 32 proportionalDelayFactor = 32
maxTakeoverDelay = 4032
def get_delay_for_name(blocks_of_continuous_ownership: int) -> int:
return min(blocks_of_continuous_ownership // proportionalDelayFactor, maxTakeoverDelay)
def get_expiration_height(last_updated_height: int) -> int: def get_expiration_height(last_updated_height: int) -> int:
if last_updated_height < nExtendedClaimExpirationForkHeight: if last_updated_height < nExtendedClaimExpirationForkHeight:
@ -57,18 +63,21 @@ class StagedClaimtrieSupport(typing.NamedTuple):
def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_effective_amount: int, tx_num: int, def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_effective_amount: int, tx_num: int,
position: int, root_tx_num: int, root_position: int, claim_hash: bytes, position: int, root_tx_num: int, root_position: int, claim_hash: bytes,
activation_height: int, prev_activation_height: int,
signing_hash: Optional[bytes] = None, signing_hash: Optional[bytes] = None,
claims_in_channel_count: Optional[int] = None): claims_in_channel_count: Optional[int] = None):
assert root_position != root_tx_num, f"{tx_num} {position} {root_tx_num} {root_tx_num}" assert root_position != root_tx_num, f"{tx_num} {position} {root_tx_num} {root_tx_num}"
ops = [ ops = [
RevertableDelete( RevertableDelete(
*Prefixes.claim_effective_amount.pack_item( *Prefixes.claim_effective_amount.pack_item(
name, prev_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position name, prev_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position,
prev_activation_height
) )
), ),
RevertablePut( RevertablePut(
*Prefixes.claim_effective_amount.pack_item( *Prefixes.claim_effective_amount.pack_item(
name, new_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position name, new_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position,
activation_height
) )
) )
] ]
@ -88,6 +97,89 @@ def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_e
return ops return ops
def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int,
previous_winning: Optional[ClaimTakeoverValue] = None):
if previous_winning:
# print(f"takeover previously owned {name} - {claim_hash.hex()} at {takeover_height}")
return [
RevertableDelete(
*Prefixes.claim_takeover.pack_item(
name, previous_winning.claim_hash, previous_winning.height
)
),
RevertablePut(
*Prefixes.claim_takeover.pack_item(
name, claim_hash, takeover_height
)
)
]
# print(f"takeover {name} - {claim_hash[::-1].hex()} at {takeover_height}")
return [
RevertablePut(
*Prefixes.claim_takeover.pack_item(
name, claim_hash, takeover_height
)
)
]
def get_force_activate_ops(name: str, tx_num: int, position: int, claim_hash: bytes, root_claim_tx_num: int,
root_claim_tx_position: int, amount: int, effective_amount: int,
prev_activation_height: int, new_activation_height: int):
return [
# delete previous
RevertableDelete(
*Prefixes.claim_effective_amount.pack_item(
name, effective_amount, tx_num, position, claim_hash,
root_claim_tx_num, root_claim_tx_position, prev_activation_height
)
),
RevertableDelete(
*Prefixes.claim_to_txo.pack_item(
claim_hash, tx_num, position, root_claim_tx_num, root_claim_tx_position,
amount, prev_activation_height, name
)
),
RevertableDelete(
*Prefixes.claim_short_id.pack_item(
name, claim_hash, root_claim_tx_num, root_claim_tx_position, tx_num,
position, prev_activation_height
)
),
RevertableDelete(
*Prefixes.pending_activation.pack_item(
prev_activation_height, tx_num, position, claim_hash, name
)
),
# insert new
RevertablePut(
*Prefixes.claim_effective_amount.pack_item(
name, effective_amount, tx_num, position, claim_hash,
root_claim_tx_num, root_claim_tx_position, new_activation_height
)
),
RevertablePut(
*Prefixes.claim_to_txo.pack_item(
claim_hash, tx_num, position, root_claim_tx_num, root_claim_tx_position,
amount, new_activation_height, name
)
),
RevertablePut(
*Prefixes.claim_short_id.pack_item(
name, claim_hash, root_claim_tx_num, root_claim_tx_position, tx_num,
position, new_activation_height
)
),
RevertablePut(
*Prefixes.pending_activation.pack_item(
new_activation_height, tx_num, position, claim_hash, name
)
)
]
class StagedClaimtrieItem(typing.NamedTuple): class StagedClaimtrieItem(typing.NamedTuple):
name: str name: str
claim_hash: bytes claim_hash: bytes
@ -119,21 +211,21 @@ class StagedClaimtrieItem(typing.NamedTuple):
op( op(
*Prefixes.claim_effective_amount.pack_item( *Prefixes.claim_effective_amount.pack_item(
self.name, self.effective_amount, self.tx_num, self.position, self.claim_hash, self.name, self.effective_amount, self.tx_num, self.position, self.claim_hash,
self.root_claim_tx_num, self.root_claim_tx_position self.root_claim_tx_num, self.root_claim_tx_position, self.activation_height
) )
), ),
# claim tip by claim hash # claim tip by claim hash
op( op(
*Prefixes.claim_to_txo.pack_item( *Prefixes.claim_to_txo.pack_item(
self.claim_hash, self.tx_num, self.position, self.root_claim_tx_num, self.root_claim_tx_position, self.claim_hash, self.tx_num, self.position, self.root_claim_tx_num, self.root_claim_tx_position,
self.amount, self.name self.amount, self.activation_height, self.name
) )
), ),
# short url resolution # short url resolution
op( op(
*Prefixes.claim_short_id.pack_item( *Prefixes.claim_short_id.pack_item(
self.name, self.claim_hash, self.root_claim_tx_num, self.root_claim_tx_position, self.tx_num, self.name, self.claim_hash, self.root_claim_tx_num, self.root_claim_tx_position, self.tx_num,
self.position self.position, self.activation_height
) )
), ),
# claim hash by txo # claim hash by txo
@ -146,6 +238,12 @@ class StagedClaimtrieItem(typing.NamedTuple):
self.expiration_height, self.tx_num, self.position, self.claim_hash, self.expiration_height, self.tx_num, self.position, self.claim_hash,
self.name self.name
) )
),
# claim activation
op(
*Prefixes.pending_activation.pack_item(
self.activation_height, self.tx_num, self.position, self.claim_hash, self.name
)
) )
] ]
if self.signing_hash and self.claims_in_channel_count is not None: if self.signing_hash and self.claims_in_channel_count is not None:
@ -187,4 +285,3 @@ class StagedClaimtrieItem(typing.NamedTuple):
delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash) delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash)
invalidate_channel_ops = self.get_invalidate_channel_ops(db) invalidate_channel_ops = self.get_invalidate_channel_ops(db)
return delete_short_id_ops + delete_claim_ops + delete_supports_ops + invalidate_channel_ops return delete_short_id_ops + delete_claim_ops + delete_supports_ops + invalidate_channel_ops

View file

@ -46,6 +46,7 @@ class EffectiveAmountValue(typing.NamedTuple):
claim_hash: bytes claim_hash: bytes
root_tx_num: int root_tx_num: int
root_position: int root_position: int
activation: int
class ClaimToTXOKey(typing.NamedTuple): class ClaimToTXOKey(typing.NamedTuple):
@ -58,6 +59,7 @@ class ClaimToTXOValue(typing.NamedTuple):
root_tx_num: int root_tx_num: int
root_position: int root_position: int
amount: int amount: int
activation: int
name: str name: str
@ -81,6 +83,7 @@ class ClaimShortIDKey(typing.NamedTuple):
class ClaimShortIDValue(typing.NamedTuple): class ClaimShortIDValue(typing.NamedTuple):
tx_num: int tx_num: int
position: int position: int
activation: int
class ClaimToChannelKey(typing.NamedTuple): class ClaimToChannelKey(typing.NamedTuple):
@ -134,10 +137,30 @@ class ClaimExpirationValue(typing.NamedTuple):
name: str name: str
class ClaimTakeoverKey(typing.NamedTuple):
name: str
class ClaimTakeoverValue(typing.NamedTuple):
claim_hash: bytes
height: int
class PendingActivationKey(typing.NamedTuple):
height: int
tx_num: int
position: int
class PendingActivationValue(typing.NamedTuple):
claim_hash: bytes
name: str
class EffectiveAmountPrefixRow(PrefixRow): class EffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_effective_amount_prefix.value prefix = DB_PREFIXES.claim_effective_amount_prefix.value
key_struct = struct.Struct(b'>QLH') key_struct = struct.Struct(b'>QLH')
value_struct = struct.Struct(b'>20sLH') value_struct = struct.Struct(b'>20sLHL')
@classmethod @classmethod
def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int): def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int):
@ -160,20 +183,20 @@ class EffectiveAmountPrefixRow(PrefixRow):
return EffectiveAmountValue(*super().unpack_value(data)) return EffectiveAmountValue(*super().unpack_value(data))
@classmethod @classmethod
def pack_value(cls, claim_hash: bytes, root_tx_num: int, root_position: int) -> bytes: def pack_value(cls, claim_hash: bytes, root_tx_num: int, root_position: int, activation: int) -> bytes:
return super().pack_value(claim_hash, root_tx_num, root_position) return super().pack_value(claim_hash, root_tx_num, root_position, activation)
@classmethod @classmethod
def pack_item(cls, name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes, def pack_item(cls, name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes,
root_tx_num: int, root_position: int): root_tx_num: int, root_position: int, activation: int):
return cls.pack_key(name, effective_amount, tx_num, position), \ return cls.pack_key(name, effective_amount, tx_num, position), \
cls.pack_value(claim_hash, root_tx_num, root_position) cls.pack_value(claim_hash, root_tx_num, root_position, activation)
class ClaimToTXOPrefixRow(PrefixRow): class ClaimToTXOPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_to_txo.value prefix = DB_PREFIXES.claim_to_txo.value
key_struct = struct.Struct(b'>20sLH') key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>LHQ') value_struct = struct.Struct(b'>LHQL')
@classmethod @classmethod
def pack_key(cls, claim_hash: bytes, tx_num: int, position: int): def pack_key(cls, claim_hash: bytes, tx_num: int, position: int):
@ -191,20 +214,20 @@ class ClaimToTXOPrefixRow(PrefixRow):
@classmethod @classmethod
def unpack_value(cls, data: bytes) -> ClaimToTXOValue: def unpack_value(cls, data: bytes) -> ClaimToTXOValue:
root_tx_num, root_position, amount = cls.value_struct.unpack(data[:14]) root_tx_num, root_position, amount, activation = cls.value_struct.unpack(data[:18])
name_len = int.from_bytes(data[14:16], byteorder='big') name_len = int.from_bytes(data[18:20], byteorder='big')
name = data[16:16 + name_len].decode() name = data[20:20 + name_len].decode()
return ClaimToTXOValue(root_tx_num, root_position, amount, name) return ClaimToTXOValue(root_tx_num, root_position, amount, activation, name)
@classmethod @classmethod
def pack_value(cls, root_tx_num: int, root_position: int, amount: int, name: str) -> bytes: def pack_value(cls, root_tx_num: int, root_position: int, amount: int, activation: int, name: str) -> bytes:
return cls.value_struct.pack(root_tx_num, root_position, amount) + length_encoded_name(name) return cls.value_struct.pack(root_tx_num, root_position, amount, activation) + length_encoded_name(name)
@classmethod @classmethod
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int, def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int,
amount: int, name: str): amount: int, activation: int, name: str):
return cls.pack_key(claim_hash, tx_num, position), \ return cls.pack_key(claim_hash, tx_num, position), \
cls.pack_value(root_tx_num, root_position, amount, name) cls.pack_value(root_tx_num, root_position, amount, activation, name)
class TXOToClaimPrefixRow(PrefixRow): class TXOToClaimPrefixRow(PrefixRow):
@ -240,15 +263,15 @@ class TXOToClaimPrefixRow(PrefixRow):
class ClaimShortIDPrefixRow(PrefixRow): class ClaimShortIDPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_short_id_prefix.value prefix = DB_PREFIXES.claim_short_id_prefix.value
key_struct = struct.Struct(b'>20sLH') key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>LH') value_struct = struct.Struct(b'>LHL')
@classmethod @classmethod
def pack_key(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int): def pack_key(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int):
return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(claim_hash, root_tx_num, root_position) return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(claim_hash, root_tx_num, root_position)
@classmethod @classmethod
def pack_value(cls, tx_num: int, position: int): def pack_value(cls, tx_num: int, position: int, activation: int):
return super().pack_value(tx_num, position) return super().pack_value(tx_num, position, activation)
@classmethod @classmethod
def unpack_key(cls, key: bytes) -> ClaimShortIDKey: def unpack_key(cls, key: bytes) -> ClaimShortIDKey:
@ -263,9 +286,9 @@ class ClaimShortIDPrefixRow(PrefixRow):
@classmethod @classmethod
def pack_item(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int, def pack_item(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int,
tx_num: int, position: int): tx_num: int, position: int, activation: int):
return cls.pack_key(name, claim_hash, root_tx_num, root_position), \ return cls.pack_key(name, claim_hash, root_tx_num, root_position), \
cls.pack_value(tx_num, position) cls.pack_value(tx_num, position, activation)
class ClaimToChannelPrefixRow(PrefixRow): class ClaimToChannelPrefixRow(PrefixRow):
@ -418,6 +441,63 @@ class ClaimExpirationPrefixRow(PrefixRow):
return cls.unpack_key(key), cls.unpack_value(value) return cls.unpack_key(key), cls.unpack_value(value)
class ClaimTakeoverPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_takeover.value
value_struct = struct.Struct(b'>20sL')
@classmethod
def pack_key(cls, name: str):
return cls.prefix + length_encoded_name(name)
@classmethod
def pack_value(cls, claim_hash: bytes, takeover_height: int):
return super().pack_value(claim_hash, takeover_height)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimTakeoverKey:
assert key[:1] == cls.prefix
name_len = int.from_bytes(key[1:3], byteorder='big')
name = key[3:3 + name_len].decode()
return ClaimTakeoverKey(name)
@classmethod
def unpack_value(cls, data: bytes) -> ClaimTakeoverValue:
return ClaimTakeoverValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, name: str, claim_hash: bytes, takeover_height: int):
return cls.pack_key(name), cls.pack_value(claim_hash, takeover_height)
class PendingClaimActivationPrefixRow(PrefixRow):
prefix = DB_PREFIXES.pending_activation.value
key_struct = struct.Struct(b'>LLH')
@classmethod
def pack_key(cls, height: int, tx_num: int, position: int):
return super().pack_key(height, tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> PendingActivationKey:
return PendingActivationKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, claim_hash: bytes, name: str) -> bytes:
return claim_hash + length_encoded_name(name)
@classmethod
def unpack_value(cls, data: bytes) -> PendingActivationValue:
claim_hash = data[:20]
name_len = int.from_bytes(data[20:22], byteorder='big')
name = data[22:22 + name_len].decode()
return PendingActivationValue(claim_hash, name)
@classmethod
def pack_item(cls, height: int, tx_num: int, position: int, claim_hash: bytes, name: str):
return cls.pack_key(height, tx_num, position), \
cls.pack_value(claim_hash, name)
class Prefixes: class Prefixes:
claim_to_support = ClaimToSupportPrefixRow claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow support_to_claim = SupportToClaimPrefixRow
@ -432,4 +512,7 @@ class Prefixes:
claim_effective_amount = EffectiveAmountPrefixRow claim_effective_amount = EffectiveAmountPrefixRow
claim_expiration = ClaimExpirationPrefixRow claim_expiration = ClaimExpirationPrefixRow
claim_takeover = ClaimTakeoverPrefixRow
pending_activation = PendingClaimActivationPrefixRow
# undo_claimtrie = b'M' # undo_claimtrie = b'M'

View file

@ -16,6 +16,8 @@ import time
import typing import typing
import struct import struct
import attr import attr
import zlib
import base64
from typing import Optional, Iterable from typing import Optional, Iterable
from functools import partial from functools import partial
from asyncio import sleep from asyncio import sleep
@ -34,9 +36,10 @@ from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_ui
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name
from lbry.wallet.server.db.claimtrie import get_expiration_height from lbry.wallet.server.db.claimtrie import get_expiration_height, get_delay_for_name
from lbry.wallet.server.db.elasticsearch import SearchIndex from lbry.wallet.server.db.elasticsearch import SearchIndex
@ -79,7 +82,7 @@ class FlushData:
adds = attr.ib() adds = attr.ib()
deletes = attr.ib() deletes = attr.ib()
tip = attr.ib() tip = attr.ib()
undo_claimtrie = attr.ib() undo = attr.ib()
class ResolveResult(typing.NamedTuple): class ResolveResult(typing.NamedTuple):
@ -89,6 +92,7 @@ class ResolveResult(typing.NamedTuple):
position: int position: int
tx_hash: bytes tx_hash: bytes
height: int height: int
amount: int
short_url: str short_url: str
is_controlling: bool is_controlling: bool
canonical_url: str canonical_url: str
@ -97,12 +101,14 @@ class ResolveResult(typing.NamedTuple):
expiration_height: int expiration_height: int
effective_amount: int effective_amount: int
support_amount: int support_amount: int
last_take_over_height: Optional[int] last_takeover_height: Optional[int]
claims_in_channel: Optional[int] claims_in_channel: Optional[int]
channel_hash: Optional[bytes] channel_hash: Optional[bytes]
reposted_claim_hash: Optional[bytes] reposted_claim_hash: Optional[bytes]
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll') DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll')
DB_STATE_STRUCT_SIZE = 92 DB_STATE_STRUCT_SIZE = 92
@ -183,12 +189,10 @@ class LevelDB:
self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout) self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout)
def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int): def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int):
claim_hash_and_name = self.db.get( claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
DB_PREFIXES.txo_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx)
)
if not claim_hash_and_name: if not claim_hash_and_name:
return return
return claim_hash_and_name[:CLAIM_HASH_LEN], claim_hash_and_name[CLAIM_HASH_LEN:] return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name)
def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]: def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]:
key = Prefixes.support_to_claim.pack_key(tx_num, position) key = Prefixes.support_to_claim.pack_key(tx_num, position)
@ -213,20 +217,22 @@ class LevelDB:
unpacked_k = Prefixes.claim_to_support.unpack_key(k) unpacked_k = Prefixes.claim_to_support.unpack_key(k)
unpacked_v = Prefixes.claim_to_support.unpack_value(v) unpacked_v = Prefixes.claim_to_support.unpack_value(v)
supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount)) supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount))
return supports return supports
def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, name: str, root_tx_num: int, def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, name: str, root_tx_num: int,
root_position: int) -> ResolveResult: root_position: int, activation_height: int) -> ResolveResult:
controlling_claim = self.get_controlling_claim(name)
tx_hash = self.total_transactions[tx_num] tx_hash = self.total_transactions[tx_num]
height = bisect_right(self.tx_counts, tx_num) height = bisect_right(self.tx_counts, tx_num)
created_height = bisect_right(self.tx_counts, root_tx_num) created_height = bisect_right(self.tx_counts, root_tx_num)
last_take_over_height = 0 last_take_over_height = controlling_claim.height
activation_height = created_height
expiration_height = get_expiration_height(height) expiration_height = get_expiration_height(height)
support_amount = self.get_support_amount(claim_hash) support_amount = self.get_support_amount(claim_hash)
effective_amount = self.get_effective_amount(claim_hash) claim_amount = self.get_claim_txo_amount(claim_hash, tx_num, position)
effective_amount = support_amount + claim_amount
channel_hash = self.get_channel_for_claim(claim_hash) channel_hash = self.get_channel_for_claim(claim_hash)
claims_in_channel = None claims_in_channel = None
@ -235,27 +241,35 @@ class LevelDB:
if channel_hash: if channel_hash:
channel_vals = self.get_root_claim_txo_and_current_amount(channel_hash) channel_vals = self.get_root_claim_txo_and_current_amount(channel_hash)
if channel_vals: if channel_vals:
_, _, _, channel_name, _, _ = channel_vals channel_name = channel_vals[1].name
claims_in_channel = self.get_claims_in_channel_count(channel_hash) claims_in_channel = self.get_claims_in_channel_count(channel_hash)
canonical_url = f'{channel_name}#{channel_hash.hex()}/{name}#{claim_hash.hex()}' canonical_url = f'{channel_name}#{channel_hash.hex()}/{name}#{claim_hash.hex()}'
return ResolveResult( return ResolveResult(
name, claim_hash, tx_num, position, tx_hash, height, short_url=short_url, name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url,
is_controlling=False, canonical_url=canonical_url, last_take_over_height=last_take_over_height, is_controlling=controlling_claim.claim_hash == claim_hash, canonical_url=canonical_url,
claims_in_channel=claims_in_channel, creation_height=created_height, activation_height=activation_height, last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel,
creation_height=created_height, activation_height=activation_height,
expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount, expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount,
channel_hash=channel_hash, reposted_claim_hash=None channel_hash=channel_hash, reposted_claim_hash=None
) )
def _resolve(self, normalized_name: str, claim_id: Optional[str] = None, def _resolve(self, normalized_name: str, claim_id: Optional[str] = None,
amount_order: int = 1) -> Optional[ResolveResult]: amount_order: Optional[int] = None) -> Optional[ResolveResult]:
""" """
:param normalized_name: name :param normalized_name: name
:param claim_id: partial or complete claim id :param claim_id: partial or complete claim id
:param amount_order: '$<value>' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided :param amount_order: '$<value>' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided
""" """
if not amount_order and not claim_id:
# winning resolution
controlling = self.get_controlling_claim(normalized_name)
if not controlling:
return
return self._fs_get_claim_by_hash(controlling.claim_hash)
encoded_name = length_encoded_name(normalized_name) encoded_name = length_encoded_name(normalized_name)
amount_order = max(int(amount_order or 1), 1) amount_order = max(int(amount_order or 1), 1)
if claim_id: if claim_id:
# resolve by partial/complete claim id # resolve by partial/complete claim id
short_claim_hash = bytes.fromhex(claim_id) short_claim_hash = bytes.fromhex(claim_id)
@ -263,19 +277,22 @@ class LevelDB:
for k, v in self.db.iterator(prefix=prefix): for k, v in self.db.iterator(prefix=prefix):
key = Prefixes.claim_short_id.unpack_key(k) key = Prefixes.claim_short_id.unpack_key(k)
claim_txo = Prefixes.claim_short_id.unpack_value(v) claim_txo = Prefixes.claim_short_id.unpack_value(v)
return self._prepare_resolve_result(claim_txo.tx_num, claim_txo.position, key.claim_hash, key.name, return self._prepare_resolve_result(
key.root_tx_num, key.root_position) claim_txo.tx_num, claim_txo.position, key.claim_hash, key.name, key.root_tx_num,
key.root_position, claim_txo.activation
)
return return
# resolve by amount ordering, 1 indexed # resolve by amount ordering, 1 indexed
for idx, (k, v) in enumerate(self.db.iterator(prefix=DB_PREFIXES.claim_effective_amount_prefix.value + encoded_name)): for idx, (k, v) in enumerate(self.db.iterator(
prefix=DB_PREFIXES.claim_effective_amount_prefix.value + encoded_name)):
if amount_order > idx + 1: if amount_order > idx + 1:
continue continue
key = Prefixes.claim_effective_amount.unpack_key(k) key = Prefixes.claim_effective_amount.unpack_key(k)
claim_val = Prefixes.claim_effective_amount.unpack_value(v) claim_val = Prefixes.claim_effective_amount.unpack_value(v)
return self._prepare_resolve_result( return self._prepare_resolve_result(
key.tx_num, key.position, claim_val.claim_hash, key.name, claim_val.root_tx_num, key.tx_num, key.position, claim_val.claim_hash, key.name, claim_val.root_tx_num,
claim_val.root_position claim_val.root_position, claim_val.activation
) )
return return
@ -293,7 +310,7 @@ class LevelDB:
return return
return list(sorted(candidates, key=lambda item: item[1]))[0] return list(sorted(candidates, key=lambda item: item[1]))[0]
def _fs_resolve(self, url): def _fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]:
try: try:
parsed = URL.parse(url) parsed = URL.parse(url)
except ValueError as e: except ValueError as e:
@ -326,7 +343,7 @@ class LevelDB:
return resolved_stream, resolved_channel return resolved_stream, resolved_channel
async def fs_resolve(self, url): async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]:
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url) return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url)
def _fs_get_claim_by_hash(self, claim_hash): def _fs_get_claim_by_hash(self, claim_hash):
@ -335,7 +352,7 @@ class LevelDB:
unpacked_v = Prefixes.claim_to_txo.unpack_value(v) unpacked_v = Prefixes.claim_to_txo.unpack_value(v)
return self._prepare_resolve_result( return self._prepare_resolve_result(
unpacked_k.tx_num, unpacked_k.position, unpacked_k.claim_hash, unpacked_v.name, unpacked_k.tx_num, unpacked_k.position, unpacked_k.claim_hash, unpacked_v.name,
unpacked_v.root_tx_num, unpacked_v.root_position unpacked_v.root_tx_num, unpacked_v.root_position, unpacked_v.activation
) )
async def fs_getclaimbyid(self, claim_id): async def fs_getclaimbyid(self, claim_id):
@ -352,17 +369,21 @@ class LevelDB:
for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash): for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash):
unpacked_k = Prefixes.claim_to_txo.unpack_key(k) unpacked_k = Prefixes.claim_to_txo.unpack_key(k)
unpacked_v = Prefixes.claim_to_txo.unpack_value(v) unpacked_v = Prefixes.claim_to_txo.unpack_value(v)
return unpacked_v.root_tx_num, unpacked_v.root_position, unpacked_v.amount, unpacked_v.name,\ return unpacked_k, unpacked_v
unpacked_k.tx_num, unpacked_k.position
def make_staged_claim_item(self, claim_hash: bytes) -> StagedClaimtrieItem: def make_staged_claim_item(self, claim_hash: bytes) -> Optional[StagedClaimtrieItem]:
root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount( claim_info = self.get_root_claim_txo_and_current_amount(claim_hash)
claim_hash k, v = claim_info
) root_tx_num = v.root_tx_num
root_idx = v.root_position
value = v.amount
name = v.name
tx_num = k.tx_num
idx = k.position
height = bisect_right(self.tx_counts, tx_num) height = bisect_right(self.tx_counts, tx_num)
effective_amount = self.db.get_support_amount(claim_hash) + value effective_amount = self.get_support_amount(claim_hash) + value
signing_hash = self.get_channel_for_claim(claim_hash) signing_hash = self.get_channel_for_claim(claim_hash)
activation_height = 0 activation_height = v.activation
if signing_hash: if signing_hash:
count = self.get_claims_in_channel_count(signing_hash) count = self.get_claims_in_channel_count(signing_hash)
else: else:
@ -372,17 +393,36 @@ class LevelDB:
root_tx_num, root_idx, signing_hash, count root_tx_num, root_idx, signing_hash, count
) )
def get_effective_amount(self, claim_hash): def get_claim_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]:
v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash, tx_num, position))
if v:
return Prefixes.claim_to_txo.unpack_value(v).amount
def get_claim_from_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
assert claim_hash
for v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_key=False): for v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_key=False):
return Prefixes.claim_to_txo.unpack_value(v).amount + self.get_support_amount(claim_hash) return Prefixes.claim_to_txo.unpack_value(v)
fnord
return None def get_claim_amount(self, claim_hash: bytes) -> Optional[int]:
claim = self.get_claim_from_txo(claim_hash)
if claim:
return claim.amount
def get_effective_amount(self, claim_hash: bytes):
return (self.get_claim_amount(claim_hash) or 0) + self.get_support_amount(claim_hash)
def get_update_effective_amount_ops(self, claim_hash: bytes, effective_amount: int): def get_update_effective_amount_ops(self, claim_hash: bytes, effective_amount: int):
claim_info = self.get_root_claim_txo_and_current_amount(claim_hash) claim_info = self.get_root_claim_txo_and_current_amount(claim_hash)
if not claim_info: if not claim_info:
return [] return []
root_tx_num, root_position, amount, name, tx_num, position = claim_info
root_tx_num = claim_info[1].root_tx_num
root_position = claim_info[1].root_position
amount = claim_info[1].amount
name = claim_info[1].name
tx_num = claim_info[0].tx_num
position = claim_info[0].position
activation = claim_info[1].activation
signing_hash = self.get_channel_for_claim(claim_hash) signing_hash = self.get_channel_for_claim(claim_hash)
claims_in_channel_count = None claims_in_channel_count = None
if signing_hash: if signing_hash:
@ -390,7 +430,8 @@ class LevelDB:
prev_effective_amount = self.get_effective_amount(claim_hash) prev_effective_amount = self.get_effective_amount(claim_hash)
return get_update_effective_amount_ops( return get_update_effective_amount_ops(
name, effective_amount, prev_effective_amount, tx_num, position, name, effective_amount, prev_effective_amount, tx_num, position,
root_tx_num, root_position, claim_hash, signing_hash, claims_in_channel_count root_tx_num, root_position, claim_hash, activation, activation, signing_hash,
claims_in_channel_count
) )
def get_claims_in_channel_count(self, channel_hash) -> int: def get_claims_in_channel_count(self, channel_hash) -> int:
@ -415,6 +456,35 @@ class LevelDB:
) )
return expired return expired
def get_controlling_claim(self, name: str) -> Optional[ClaimTakeoverValue]:
controlling = self.db.get(Prefixes.claim_takeover.pack_key(name))
if not controlling:
return
return Prefixes.claim_takeover.unpack_value(controlling)
def get_claims_for_name(self, name: str):
claim_hashes = set()
for k in self.db.iterator(prefix=Prefixes.claim_short_id.prefix + length_encoded_name(name),
include_value=False):
claim_hashes.add(Prefixes.claim_short_id.unpack_key(k).claim_hash)
return claim_hashes
def get_activated_claims_at_height(self, height: int) -> typing.Set[PendingActivationValue]:
claims = set()
prefix = Prefixes.pending_activation.prefix + height.to_bytes(4, byteorder='big')
for _v in self.db.iterator(prefix=prefix, include_key=False):
v = Prefixes.pending_activation.unpack_value(_v)
claims.add(v)
return claims
def get_activation_delay(self, claim_hash: bytes, name: str) -> int:
controlling = self.get_controlling_claim(name)
if not controlling:
return 0
if claim_hash == controlling.claim_hash:
return 0
return get_delay_for_name(self.db_height - controlling.height)
async def _read_tx_counts(self): async def _read_tx_counts(self):
if self.tx_counts is not None: if self.tx_counts is not None:
return return
@ -685,9 +755,9 @@ class LevelDB:
batch_delete(staged_change.key) batch_delete(staged_change.key)
flush_data.claimtrie_stash.clear() flush_data.claimtrie_stash.clear()
for undo_claims, height in flush_data.undo_claimtrie: for undo_ops, height in flush_data.undo:
batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_claims) batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_ops)
flush_data.undo_claimtrie.clear() flush_data.undo.clear()
self.fs_height = flush_data.height self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count self.fs_tx_count = flush_data.tx_count
@ -788,11 +858,10 @@ class LevelDB:
claim_reorg_height = self.fs_height claim_reorg_height = self.fs_height
# print("flush undos", flush_data.undo_claimtrie) # print("flush undos", flush_data.undo_claimtrie)
for (ops, height) in reversed(flush_data.undo_claimtrie): for (packed_ops, height) in reversed(flush_data.undo):
claimtrie_ops = RevertableOp.unpack_stack(ops) undo_ops = RevertableOp.unpack_stack(packed_ops)
print("%i undo ops for %i" % (len(claimtrie_ops), height)) for op in reversed(undo_ops):
for op in reversed(claimtrie_ops): # print("REWIND", op)
print("REWIND", op)
if op.is_put: if op.is_put:
batch_put(op.key, op.value) batch_put(op.key, op.value)
else: else:
@ -800,7 +869,7 @@ class LevelDB:
batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height)) batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height))
claim_reorg_height -= 1 claim_reorg_height -= 1
flush_data.undo_claimtrie.clear() flush_data.undo.clear()
flush_data.claimtrie_stash.clear() flush_data.claimtrie_stash.clear()
while self.fs_height > flush_data.height: while self.fs_height > flush_data.height:
@ -828,9 +897,9 @@ class LevelDB:
batch_put(key, value) batch_put(key, value)
# New undo information # New undo information
for undo_info, height in flush_data.undo_infos: for undo_info, height in flush_data.undo:
batch.put(self.undo_key(height), b''.join(undo_info)) batch.put(self.undo_key(height), b''.join(undo_info))
flush_data.undo_infos.clear() flush_data.undo.clear()
# Spends # Spends
for key in sorted(flush_data.deletes): for key in sorted(flush_data.deletes):
@ -1023,9 +1092,9 @@ class LevelDB:
"""Returns a height from which we should store undo info.""" """Returns a height from which we should store undo info."""
return max_height - self.env.reorg_limit + 1 return max_height - self.env.reorg_limit + 1
def undo_key(self, height): def undo_key(self, height: int) -> bytes:
"""DB key for undo information at the given height.""" """DB key for undo information at the given height."""
return UNDO_PREFIX + pack('>I', height) return DB_PREFIXES.UNDO_PREFIX.value + pack('>I', height)
def read_undo_info(self, height): def read_undo_info(self, height):
"""Read undo information from a file for the current height.""" """Read undo information from a file for the current height."""