This commit is contained in:
Jack Robison 2021-05-24 12:41:44 -04:00
parent b69faf6920
commit 0a28d216fd
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -206,18 +206,28 @@ class BlockProcessor:
self.history_cache = {}
self.status_server = StatusServer()
# txo to pending claim
self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {}
# claim hash to pending claim txo
self.pending_claim_txos: typing.Dict[bytes, Tuple[int, int]] = {}
self.pending_supports = defaultdict(list)
self.pending_support_txos = {}
self.pending_removed_support = defaultdict(lambda: defaultdict(list))
# claim hash to lists of pending support txos
self.pending_supports: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
# support txo: (supported claim hash, support amount)
self.pending_support_txos: Dict[Tuple[int, int], Tuple[bytes, int]] = {}
# removed supports {name: {claim_hash: [(tx_num, nout), ...]}}
self.pending_removed_support: DefaultDict[str, DefaultDict[bytes, List[Tuple[int, int]]]] = defaultdict(
lambda: defaultdict(list))
self.staged_pending_abandoned: Dict[bytes, StagedClaimtrieItem] = {}
self.removed_active_support = defaultdict(list)
self.staged_activated_support = defaultdict(list)
self.staged_activated_claim = {}
self.pending_activated = defaultdict(lambda: defaultdict(list))
# removed activated support amounts by claim hash
self.removed_active_support: DefaultDict[bytes, List[int]] = defaultdict(list)
# pending activated support amounts by claim hash
self.staged_activated_support: DefaultDict[bytes, List[int]] = defaultdict(list)
# pending activated name and claim hash to claim/update txo amount
self.staged_activated_claim: Dict[Tuple[str, bytes], int] = {}
# pending claim and support activations per claim hash per name,
# used to process takeovers due to added activations
self.pending_activated: DefaultDict[str, DefaultDict[bytes, List[Tuple[PendingActivationKey, int]]]] = \
defaultdict(lambda: defaultdict(list))
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
@ -482,10 +492,11 @@ class BlockProcessor:
print(f"\tspent support for lbry://{supported_name}#{spent_support.hex()} activation:{activation} {support_amount}")
return StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops() + StagedActivation(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
support_amount
).get_remove_activate_ops()
).get_spend_support_txo_ops() + \
StagedActivation(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
support_amount
).get_remove_activate_ops()
return []
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]):
@ -575,10 +586,9 @@ class BlockProcessor:
return claim_info[1].name
def _get_pending_supported_amount(self, claim_hash: bytes) -> int:
support_amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.height + 1) or 0
amount = support_amount + sum(
self.staged_activated_support.get(claim_hash, [])
)
amount = self.db._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.height + 1) or 0
if claim_hash in self.staged_activated_support:
amount += sum(self.staged_activated_support[claim_hash])
if claim_hash in self.removed_active_support:
return amount - sum(self.removed_active_support[claim_hash])
return amount
@ -589,13 +599,9 @@ class BlockProcessor:
return claim_amount + support_amount
def _get_takeover_ops(self, height: int) -> List['RevertableOp']:
ops = []
# get takeovers from claims activated at this block
activated_at_height = self.db.get_activated_at_height(height)
# cache for controlling claims as of the previous block
controlling_claims = {}
abandoned_need_takeover = []
abandoned_support_check_need_takeover = defaultdict(list)
def get_controlling(_name):
if _name not in controlling_claims:
@ -605,15 +611,21 @@ class BlockProcessor:
_controlling = controlling_claims[_name]
return _controlling
ops = []
names_with_abandoned_controlling_claims: List[str] = []
# get the claims and supports previously scheduled to be activated at this block
activated_at_height = self.db.get_activated_at_height(height)
# determine names needing takeover/deletion due to controlling claims being abandoned
# and add ops to deactivate abandoned claims
for claim_hash, staged in self.staged_pending_abandoned.items():
controlling = get_controlling(staged.name)
if controlling and controlling.claim_hash == claim_hash:
abandoned_need_takeover.append(staged.name)
names_with_abandoned_controlling_claims.append(staged.name)
print(f"\t{staged.name} needs takeover")
activation = self.db.get_activation(staged.tx_num, staged.position)
if activation > 0:
if activation > 0: # db returns -1 for non-existent txos
# removed queued future activation from the db
ops.extend(
StagedActivation(
@ -622,14 +634,16 @@ class BlockProcessor:
).get_remove_activate_ops()
)
else:
# it hadn't yet been activated, db returns -1 for non-existent txos
# it hadn't yet been activated
pass
# build set of controlling claims that had activated supports spent to check them for takeovers later
# get the removed activated supports for controlling claims to determine if takeovers are possible
abandoned_support_check_need_takeover = defaultdict(list)
for claim_hash, amounts in self.removed_active_support.items():
name = self._get_pending_claim_name(claim_hash)
controlling = get_controlling(name)
if controlling and controlling.claim_hash == claim_hash and name not in abandoned_need_takeover:
if controlling and controlling.claim_hash == claim_hash and \
name not in names_with_abandoned_controlling_claims:
abandoned_support_check_need_takeover[(name, claim_hash)].extend(amounts)
# prepare to activate or delay activation of the pending claims being added this block
@ -637,7 +651,7 @@ class BlockProcessor:
controlling = get_controlling(staged.name)
delay = 0
if not controlling or staged.claim_hash == controlling.claim_hash or \
controlling.claim_hash in abandoned_need_takeover:
controlling.claim_hash in names_with_abandoned_controlling_claims:
pass
else:
controlling_effective_amount = self._get_pending_effective_amount(staged.name, controlling.claim_hash)
@ -736,7 +750,7 @@ class BlockProcessor:
# go through claims where the controlling claim or supports to the controlling claim have been abandoned
# check if takeovers are needed or if the name node is now empty
need_reactivate_if_takes_over = {}
for need_takeover in abandoned_need_takeover:
for need_takeover in names_with_abandoned_controlling_claims:
existing = self.db.get_claim_txos_for_name(need_takeover)
has_candidate = False
# add existing claims to the queue for the takeover
@ -764,7 +778,7 @@ class BlockProcessor:
checked_names = set()
for name, activated in self.pending_activated.items():
checked_names.add(name)
if name in abandoned_need_takeover:
if name in names_with_abandoned_controlling_claims:
print(f'\tabandoned {name} need takeover')
controlling = controlling_claims[name]
amounts = {
@ -774,8 +788,9 @@ class BlockProcessor:
if controlling and controlling.claim_hash not in self.staged_pending_abandoned:
amounts[controlling.claim_hash] = self._get_pending_effective_amount(name, controlling.claim_hash)
winning = max(amounts, key=lambda x: amounts[x])
if not controlling or (winning != controlling.claim_hash and name in abandoned_need_takeover) or ((winning != controlling.claim_hash) and
(amounts[winning] > amounts[controlling.claim_hash])):
if not controlling or (winning != controlling.claim_hash and
name in names_with_abandoned_controlling_claims) or \
((winning != controlling.claim_hash) and (amounts[winning] > amounts[controlling.claim_hash])):
if (name, winning) in need_reactivate_if_takes_over:
previous_pending_activate = need_reactivate_if_takes_over[(name, winning)]
amount = self.db.get_claim_txo_amount(