use RevertableOpStack in _get_takeover_ops

This commit is contained in:
Jack Robison 2021-06-28 14:20:33 -04:00 committed by Victor Shyba
parent 6a46f50a35
commit f01b8c849d
3 changed files with 35 additions and 30 deletions

View file

@ -207,7 +207,7 @@ class BlockProcessor:
self.db_deletes = []
# Claimtrie cache
self.claimtrie_stash = []
self.claimtrie_stash = None
self.undo_claims = []
# If the lock is successfully acquired, in-memory chain state
@ -762,7 +762,7 @@ class BlockProcessor:
support_amount = self._get_pending_supported_amount(claim_hash, height=height)
return claim_amount + support_amount
def _get_takeover_ops(self, height: int) -> List['RevertableOp']:
def _get_takeover_ops(self, height: int):
# cache for controlling claims as of the previous block
controlling_claims = {}
@ -775,7 +775,6 @@ class BlockProcessor:
_controlling = controlling_claims[_name]
return _controlling
ops = []
names_with_abandoned_controlling_claims: List[str] = []
# get the claims and supports previously scheduled to be activated at this block
@ -832,7 +831,7 @@ class BlockProcessor:
activation = self.db.get_activation(staged.tx_num, staged.position)
if activation > 0: # db returns -1 for non-existent txos
# removed queued future activation from the db
ops.extend(
self.claimtrie_stash.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position,
activation, staged.name, staged.amount
@ -855,7 +854,7 @@ class BlockProcessor:
# prepare to activate or delay activation of the pending claims being added this block
for (tx_num, nout), staged in self.pending_claims.items():
ops.extend(get_delayed_activate_ops(
self.claimtrie_stash.extend(get_delayed_activate_ops(
staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False
))
@ -875,7 +874,7 @@ class BlockProcessor:
v = supported_claim_info
name = v.name
staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position)
ops.extend(get_delayed_activate_ops(
self.claimtrie_stash.extend(get_delayed_activate_ops(
name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True
))
@ -952,7 +951,7 @@ class BlockProcessor:
if not has_candidate:
# remove name takeover entry, the name is now unclaimed
controlling = get_controlling(need_takeover)
ops.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
self.claimtrie_stash.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height))
# scan for possible takeovers out of the accumulated activations, of these make sure there
# aren't any future activations for the taken over names with yet higher amounts, if there are
@ -1043,13 +1042,13 @@ class BlockProcessor:
break
assert None not in (amount, activation)
# update the claim that's activating early
ops.extend(
self.claimtrie_stash.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, activation, name, amount
).get_remove_activate_ops()
)
ops.extend(
self.claimtrie_stash.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, height, name, amount
@ -1059,19 +1058,19 @@ class BlockProcessor:
txo = (k.tx_num, k.position)
if txo in self.possible_future_support_txos[winning_including_future_activations]:
t = ACTIVATED_SUPPORT_TXO_TYPE
ops.extend(
self.claimtrie_stash.extend(
StagedActivation(
t, winning_including_future_activations, k.tx_num,
k.position, k.height, name, amount
).get_remove_activate_ops()
)
ops.extend(
self.claimtrie_stash.extend(
StagedActivation(
t, winning_including_future_activations, k.tx_num,
k.position, height, name, amount
).get_activate_ops()
)
ops.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
elif not controlling or (winning_claim_hash != controlling.claim_hash and
name in names_with_abandoned_controlling_claims) or \
((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])):
@ -1089,19 +1088,19 @@ class BlockProcessor:
if previous_pending_activate.height > height:
# the claim had a pending activation in the future, move it to now
if tx_num < self.tx_count:
ops.extend(
self.claimtrie_stash.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, previous_pending_activate.height, name, amount
).get_remove_activate_ops()
)
ops.extend(
self.claimtrie_stash.extend(
StagedActivation(
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, height, name, amount
).get_activate_ops()
)
ops.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
elif winning_claim_hash == controlling.claim_hash:
# print("\tstill winning")
pass
@ -1124,7 +1123,7 @@ class BlockProcessor:
winning = max(amounts, key=lambda x: amounts[x])
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
ops.extend(get_takeover_name_ops(name, winning, height, controlling))
self.claimtrie_stash.extend(get_takeover_name_ops(name, winning, height, controlling))
# gather cumulative removed/touched sets to update the search index
self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys()))
@ -1155,7 +1154,7 @@ class BlockProcessor:
amount = self._cached_get_effective_amount(touched)
if amount > 0:
prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position
ops.extend(get_remove_effective_amount_ops(
self.claimtrie_stash.extend(get_remove_effective_amount_ops(
name, amount, prev_tx_num, prev_position, touched
))
else:
@ -1163,12 +1162,13 @@ class BlockProcessor:
name, tx_num, position = v.name, v.tx_num, v.position
amt = self._cached_get_effective_amount(touched)
if amt > 0:
ops.extend(get_remove_effective_amount_ops(
self.claimtrie_stash.extend(get_remove_effective_amount_ops(
name, amt, tx_num, position, touched
))
ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
tx_num, position, touched))
return ops
self.claimtrie_stash.extend(
get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
tx_num, position, touched)
)
def advance_block(self, block):
height = self.height + 1
@ -1186,8 +1186,7 @@ class BlockProcessor:
# Use local vars for speed in the loops
put_utxo = self.utxo_cache.__setitem__
claimtrie_stash = RevertableOpStack(self.db.db.get)
claimtrie_stash_extend = claimtrie_stash.extend
claimtrie_stash_extend = self.claimtrie_stash.extend
spend_utxo = self.spend_utxo
undo_info_append = undo_info.append
update_touched = self.touched.update
@ -1251,9 +1250,7 @@ class BlockProcessor:
claimtrie_stash_extend(expired_ops)
# activate claims and process takeovers
takeover_ops = self._get_takeover_ops(height)
if takeover_ops:
claimtrie_stash_extend(takeover_ops)
self._get_takeover_ops(height)
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
_unflushed = self.db.hist_unflushed
@ -1266,8 +1263,7 @@ class BlockProcessor:
self.tx_count = tx_count
self.db.tx_counts.append(self.tx_count)
undo_claims = b''.join(op.invert().pack() for op in claimtrie_stash)
self.claimtrie_stash.extend(claimtrie_stash)
undo_claims = b''.join(op.invert().pack() for op in self.claimtrie_stash)
# print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash)))
if height >= self.daemon.cached_height() - self.env.reorg_limit:
@ -1281,6 +1277,7 @@ class BlockProcessor:
self.db.flush_dbs(self.flush_data())
self.claimtrie_stash.clear()
self.pending_claims.clear()
self.pending_claim_txos.clear()
self.pending_supports.clear()
@ -1528,6 +1525,7 @@ class BlockProcessor:
self._caught_up_event = caught_up_event
try:
await self.db.open_dbs()
self.claimtrie_stash = RevertableOpStack(self.db.db.get)
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count

View file

@ -1,5 +1,6 @@
import typing
import struct
from typing import Union, Tuple, NamedTuple
from lbry.wallet.server.db import DB_PREFIXES
@ -837,5 +838,8 @@ ROW_TYPES = {
}
def auto_decode_item(key: bytes, value: bytes) -> typing.Tuple[typing.NamedTuple, typing.NamedTuple]:
return ROW_TYPES[key[:1]].unpack_item(key, value)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
try:
return ROW_TYPES[key[:1]].unpack_item(key, value)
except KeyError:
return key, value

View file

@ -104,6 +104,9 @@ class RevertableOpStack:
for op in ops:
self.append(op)
def clear(self):
self._items.clear()
def __len__(self):
return sum(map(len, self._items.values()))