use RevertableOpStack in _get_takeover_ops
This commit is contained in:
parent
d119fcfc98
commit
1dc961d6eb
3 changed files with 35 additions and 30 deletions
|
@ -207,7 +207,7 @@ class BlockProcessor:
|
|||
self.db_deletes = []
|
||||
|
||||
# Claimtrie cache
|
||||
self.claimtrie_stash = []
|
||||
self.claimtrie_stash = None
|
||||
self.undo_claims = []
|
||||
|
||||
# If the lock is successfully acquired, in-memory chain state
|
||||
|
@ -762,7 +762,7 @@ class BlockProcessor:
|
|||
support_amount = self._get_pending_supported_amount(claim_hash, height=height)
|
||||
return claim_amount + support_amount
|
||||
|
||||
def _get_takeover_ops(self, height: int) -> List['RevertableOp']:
|
||||
def _get_takeover_ops(self, height: int):
|
||||
|
||||
# cache for controlling claims as of the previous block
|
||||
controlling_claims = {}
|
||||
|
@ -775,7 +775,6 @@ class BlockProcessor:
|
|||
_controlling = controlling_claims[_name]
|
||||
return _controlling
|
||||
|
||||
ops = []
|
||||
names_with_abandoned_controlling_claims: List[str] = []
|
||||
|
||||
# get the claims and supports previously scheduled to be activated at this block
|
||||
|
@ -832,7 +831,7 @@ class BlockProcessor:
|
|||
activation = self.db.get_activation(staged.tx_num, staged.position)
|
||||
if activation > 0: # db returns -1 for non-existent txos
|
||||
# removed queued future activation from the db
|
||||
ops.extend(
|
||||
self.claimtrie_stash.extend(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position,
|
||||
activation, staged.name, staged.amount
|
||||
|
@ -855,7 +854,7 @@ class BlockProcessor:
|
|||
|
||||
# prepare to activate or delay activation of the pending claims being added this block
|
||||
for (tx_num, nout), staged in self.pending_claims.items():
|
||||
ops.extend(get_delayed_activate_ops(
|
||||
self.claimtrie_stash.extend(get_delayed_activate_ops(
|
||||
staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False
|
||||
))
|
||||
|
||||
|
@ -875,7 +874,7 @@ class BlockProcessor:
|
|||
v = supported_claim_info
|
||||
name = v.name
|
||||
staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position)
|
||||
ops.extend(get_delayed_activate_ops(
|
||||
self.claimtrie_stash.extend(get_delayed_activate_ops(
|
||||
name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True
|
||||
))
|
||||
|
||||
|
@ -952,7 +951,7 @@ class BlockProcessor:
|
|||
if not has_candidate:
|
||||
# remove name takeover entry, the name is now unclaimed
|
||||
controlling = get_controlling(need_takeover)
|
||||
ops.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
|
||||
self.claimtrie_stash.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
|
||||
|
||||
# scan for possible takeovers out of the accumulated activations, of these make sure there
|
||||
# aren't any future activations for the taken over names with yet higher amounts, if there are
|
||||
|
@ -1043,13 +1042,13 @@ class BlockProcessor:
|
|||
break
|
||||
assert None not in (amount, activation)
|
||||
# update the claim that's activating early
|
||||
ops.extend(
|
||||
self.claimtrie_stash.extend(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
|
||||
position, activation, name, amount
|
||||
).get_remove_activate_ops()
|
||||
)
|
||||
ops.extend(
|
||||
self.claimtrie_stash.extend(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
|
||||
position, height, name, amount
|
||||
|
@ -1059,19 +1058,19 @@ class BlockProcessor:
|
|||
txo = (k.tx_num, k.position)
|
||||
if txo in self.possible_future_support_txos[winning_including_future_activations]:
|
||||
t = ACTIVATED_SUPPORT_TXO_TYPE
|
||||
ops.extend(
|
||||
self.claimtrie_stash.extend(
|
||||
StagedActivation(
|
||||
t, winning_including_future_activations, k.tx_num,
|
||||
k.position, k.height, name, amount
|
||||
).get_remove_activate_ops()
|
||||
)
|
||||
ops.extend(
|
||||
self.claimtrie_stash.extend(
|
||||
StagedActivation(
|
||||
t, winning_including_future_activations, k.tx_num,
|
||||
k.position, height, name, amount
|
||||
).get_activate_ops()
|
||||
)
|
||||
ops.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
|
||||
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
|
||||
elif not controlling or (winning_claim_hash != controlling.claim_hash and
|
||||
name in names_with_abandoned_controlling_claims) or \
|
||||
((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])):
|
||||
|
@ -1089,19 +1088,19 @@ class BlockProcessor:
|
|||
if previous_pending_activate.height > height:
|
||||
# the claim had a pending activation in the future, move it to now
|
||||
if tx_num < self.tx_count:
|
||||
ops.extend(
|
||||
self.claimtrie_stash.extend(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
|
||||
position, previous_pending_activate.height, name, amount
|
||||
).get_remove_activate_ops()
|
||||
)
|
||||
ops.extend(
|
||||
self.claimtrie_stash.extend(
|
||||
StagedActivation(
|
||||
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
|
||||
position, height, name, amount
|
||||
).get_activate_ops()
|
||||
)
|
||||
ops.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
|
||||
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
|
||||
elif winning_claim_hash == controlling.claim_hash:
|
||||
# print("\tstill winning")
|
||||
pass
|
||||
|
@ -1124,7 +1123,7 @@ class BlockProcessor:
|
|||
winning = max(amounts, key=lambda x: amounts[x])
|
||||
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
|
||||
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
|
||||
ops.extend(get_takeover_name_ops(name, winning, height, controlling))
|
||||
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning, height, controlling))
|
||||
|
||||
# gather cumulative removed/touched sets to update the search index
|
||||
self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys()))
|
||||
|
@ -1155,7 +1154,7 @@ class BlockProcessor:
|
|||
amount = self._cached_get_effective_amount(touched)
|
||||
if amount > 0:
|
||||
prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position
|
||||
ops.extend(get_remove_effective_amount_ops(
|
||||
self.claimtrie_stash.extend(get_remove_effective_amount_ops(
|
||||
name, amount, prev_tx_num, prev_position, touched
|
||||
))
|
||||
else:
|
||||
|
@ -1163,12 +1162,13 @@ class BlockProcessor:
|
|||
name, tx_num, position = v.name, v.tx_num, v.position
|
||||
amt = self._cached_get_effective_amount(touched)
|
||||
if amt > 0:
|
||||
ops.extend(get_remove_effective_amount_ops(
|
||||
self.claimtrie_stash.extend(get_remove_effective_amount_ops(
|
||||
name, amt, tx_num, position, touched
|
||||
))
|
||||
ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
|
||||
tx_num, position, touched))
|
||||
return ops
|
||||
self.claimtrie_stash.extend(
|
||||
get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
|
||||
tx_num, position, touched)
|
||||
)
|
||||
|
||||
def advance_block(self, block):
|
||||
height = self.height + 1
|
||||
|
@ -1186,8 +1186,7 @@ class BlockProcessor:
|
|||
|
||||
# Use local vars for speed in the loops
|
||||
put_utxo = self.utxo_cache.__setitem__
|
||||
claimtrie_stash = RevertableOpStack(self.db.db.get)
|
||||
claimtrie_stash_extend = claimtrie_stash.extend
|
||||
claimtrie_stash_extend = self.claimtrie_stash.extend
|
||||
spend_utxo = self.spend_utxo
|
||||
undo_info_append = undo_info.append
|
||||
update_touched = self.touched.update
|
||||
|
@ -1251,9 +1250,7 @@ class BlockProcessor:
|
|||
claimtrie_stash_extend(expired_ops)
|
||||
|
||||
# activate claims and process takeovers
|
||||
takeover_ops = self._get_takeover_ops(height)
|
||||
if takeover_ops:
|
||||
claimtrie_stash_extend(takeover_ops)
|
||||
self._get_takeover_ops(height)
|
||||
|
||||
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
_unflushed = self.db.hist_unflushed
|
||||
|
@ -1266,8 +1263,7 @@ class BlockProcessor:
|
|||
self.tx_count = tx_count
|
||||
self.db.tx_counts.append(self.tx_count)
|
||||
|
||||
undo_claims = b''.join(op.invert().pack() for op in claimtrie_stash)
|
||||
self.claimtrie_stash.extend(claimtrie_stash)
|
||||
undo_claims = b''.join(op.invert().pack() for op in self.claimtrie_stash)
|
||||
# print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash)))
|
||||
|
||||
if height >= self.daemon.cached_height() - self.env.reorg_limit:
|
||||
|
@ -1281,6 +1277,7 @@ class BlockProcessor:
|
|||
|
||||
self.db.flush_dbs(self.flush_data())
|
||||
|
||||
self.claimtrie_stash.clear()
|
||||
self.pending_claims.clear()
|
||||
self.pending_claim_txos.clear()
|
||||
self.pending_supports.clear()
|
||||
|
@ -1528,6 +1525,7 @@ class BlockProcessor:
|
|||
self._caught_up_event = caught_up_event
|
||||
try:
|
||||
await self.db.open_dbs()
|
||||
self.claimtrie_stash = RevertableOpStack(self.db.db.get)
|
||||
self.height = self.db.db_height
|
||||
self.tip = self.db.db_tip
|
||||
self.tx_count = self.db.db_tx_count
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import typing
|
||||
import struct
|
||||
from typing import Union, Tuple, NamedTuple
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
|
||||
|
||||
|
@ -837,5 +838,8 @@ ROW_TYPES = {
|
|||
}
|
||||
|
||||
|
||||
def auto_decode_item(key: bytes, value: bytes) -> typing.Tuple[typing.NamedTuple, typing.NamedTuple]:
|
||||
return ROW_TYPES[key[:1]].unpack_item(key, value)
|
||||
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
||||
try:
|
||||
return ROW_TYPES[key[:1]].unpack_item(key, value)
|
||||
except KeyError:
|
||||
return key, value
|
||||
|
|
|
@ -104,6 +104,9 @@ class RevertableOpStack:
|
|||
for op in ops:
|
||||
self.append(op)
|
||||
|
||||
def clear(self):
|
||||
self._items.clear()
|
||||
|
||||
def __len__(self):
|
||||
return sum(map(len, self._items.values()))
|
||||
|
||||
|
|
Loading…
Reference in a new issue