diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index a51c70ca2..b054e5e97 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -207,7 +207,7 @@ class BlockProcessor: self.db_deletes = [] # Claimtrie cache - self.claimtrie_stash = [] + self.claimtrie_stash = None self.undo_claims = [] # If the lock is successfully acquired, in-memory chain state @@ -762,7 +762,7 @@ class BlockProcessor: support_amount = self._get_pending_supported_amount(claim_hash, height=height) return claim_amount + support_amount - def _get_takeover_ops(self, height: int) -> List['RevertableOp']: + def _get_takeover_ops(self, height: int): # cache for controlling claims as of the previous block controlling_claims = {} @@ -775,7 +775,6 @@ class BlockProcessor: _controlling = controlling_claims[_name] return _controlling - ops = [] names_with_abandoned_controlling_claims: List[str] = [] # get the claims and supports previously scheduled to be activated at this block @@ -832,7 +831,7 @@ class BlockProcessor: activation = self.db.get_activation(staged.tx_num, staged.position) if activation > 0: # db returns -1 for non-existent txos # removed queued future activation from the db - ops.extend( + self.claimtrie_stash.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, staged.claim_hash, staged.tx_num, staged.position, activation, staged.name, staged.amount @@ -855,7 +854,7 @@ class BlockProcessor: # prepare to activate or delay activation of the pending claims being added this block for (tx_num, nout), staged in self.pending_claims.items(): - ops.extend(get_delayed_activate_ops( + self.claimtrie_stash.extend(get_delayed_activate_ops( staged.name, staged.claim_hash, not staged.is_update, tx_num, nout, staged.amount, is_support=False )) @@ -875,7 +874,7 @@ class BlockProcessor: v = supported_claim_info name = v.name staged_is_new_claim = (v.root_tx_num, v.root_position) == (v.tx_num, v.position) - ops.extend(get_delayed_activate_ops( + self.claimtrie_stash.extend(get_delayed_activate_ops( name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True )) @@ -952,7 +951,7 @@ class BlockProcessor: if not has_candidate: # remove name takeover entry, the name is now unclaimed controlling = get_controlling(need_takeover) - ops.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) + self.claimtrie_stash.extend(get_remove_name_ops(need_takeover, controlling.claim_hash, controlling.height)) # scan for possible takeovers out of the accumulated activations, of these make sure there # aren't any future activations for the taken over names with yet higher amounts, if there are @@ -1043,13 +1042,13 @@ class BlockProcessor: break assert None not in (amount, activation) # update the claim that's activating early - ops.extend( + self.claimtrie_stash.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, position, activation, name, amount ).get_remove_activate_ops() ) - ops.extend( + self.claimtrie_stash.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, position, height, name, amount @@ -1059,19 +1058,19 @@ class BlockProcessor: txo = (k.tx_num, k.position) if txo in self.possible_future_support_txos[winning_including_future_activations]: t = ACTIVATED_SUPPORT_TXO_TYPE - ops.extend( + self.claimtrie_stash.extend( StagedActivation( t, winning_including_future_activations, k.tx_num, k.position, k.height, name, amount ).get_remove_activate_ops() ) - ops.extend( + self.claimtrie_stash.extend( StagedActivation( t, winning_including_future_activations, k.tx_num, k.position, height, name, amount ).get_activate_ops() ) - ops.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) + self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) elif not controlling or (winning_claim_hash != controlling.claim_hash and name in names_with_abandoned_controlling_claims) or \ ((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])): @@ -1089,19 +1088,19 @@ class BlockProcessor: if previous_pending_activate.height > height: # the claim had a pending activation in the future, move it to now if tx_num < self.tx_count: - ops.extend( + self.claimtrie_stash.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, position, previous_pending_activate.height, name, amount ).get_remove_activate_ops() ) - ops.extend( + self.claimtrie_stash.extend( StagedActivation( ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, position, height, name, amount ).get_activate_ops() ) - ops.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) + self.claimtrie_stash.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) elif winning_claim_hash == controlling.claim_hash: # print("\tstill winning") pass @@ -1124,7 +1123,7 @@ class BlockProcessor: winning = max(amounts, key=lambda x: amounts[x]) if (controlling and winning != controlling.claim_hash) or (not controlling and winning): # print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") - ops.extend(get_takeover_name_ops(name, winning, height, controlling)) + self.claimtrie_stash.extend(get_takeover_name_ops(name, winning, height, controlling)) # gather cumulative removed/touched sets to update the search index self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys())) @@ -1155,7 +1154,7 @@ class BlockProcessor: amount = self._cached_get_effective_amount(touched) if amount > 0: prev_tx_num, prev_position = claim_from_db.tx_num, claim_from_db.position - ops.extend(get_remove_effective_amount_ops( + self.claimtrie_stash.extend(get_remove_effective_amount_ops( name, amount, prev_tx_num, prev_position, touched )) else: @@ -1163,12 +1162,13 @@ class BlockProcessor: name, tx_num, position = v.name, v.tx_num, v.position amt = self._cached_get_effective_amount(touched) if amt > 0: - ops.extend(get_remove_effective_amount_ops( + self.claimtrie_stash.extend(get_remove_effective_amount_ops( name, amt, tx_num, position, touched )) - ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched), - tx_num, position, touched)) - return ops + self.claimtrie_stash.extend( + get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched), + tx_num, position, touched) + ) def advance_block(self, block): height = self.height + 1 @@ -1186,8 +1186,7 @@ class BlockProcessor: # Use local vars for speed in the loops put_utxo = self.utxo_cache.__setitem__ - claimtrie_stash = RevertableOpStack(self.db.db.get) - claimtrie_stash_extend = claimtrie_stash.extend + claimtrie_stash_extend = self.claimtrie_stash.extend spend_utxo = self.spend_utxo undo_info_append = undo_info.append update_touched = self.touched.update @@ -1251,9 +1250,7 @@ class BlockProcessor: claimtrie_stash_extend(expired_ops) # activate claims and process takeovers - takeover_ops = self._get_takeover_ops(height) - if takeover_ops: - claimtrie_stash_extend(takeover_ops) + self._get_takeover_ops(height) # self.db.add_unflushed(hashXs_by_tx, self.tx_count) _unflushed = self.db.hist_unflushed @@ -1266,8 +1263,7 @@ class BlockProcessor: self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) - undo_claims = b''.join(op.invert().pack() for op in claimtrie_stash) - self.claimtrie_stash.extend(claimtrie_stash) + undo_claims = b''.join(op.invert().pack() for op in self.claimtrie_stash) # print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash))) if height >= self.daemon.cached_height() - self.env.reorg_limit: @@ -1281,6 +1277,7 @@ class BlockProcessor: self.db.flush_dbs(self.flush_data()) + self.claimtrie_stash.clear() self.pending_claims.clear() self.pending_claim_txos.clear() self.pending_supports.clear() @@ -1528,6 +1525,7 @@ class BlockProcessor: self._caught_up_event = caught_up_event try: await self.db.open_dbs() + self.claimtrie_stash = RevertableOpStack(self.db.db.get) self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index c591b5761..7a6d8904c 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -1,5 +1,6 @@ import typing import struct +from typing import Union, Tuple, NamedTuple from lbry.wallet.server.db import DB_PREFIXES @@ -837,5 +838,8 @@ ROW_TYPES = { } -def auto_decode_item(key: bytes, value: bytes) -> typing.Tuple[typing.NamedTuple, typing.NamedTuple]: - return ROW_TYPES[key[:1]].unpack_item(key, value) +def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: + try: + return ROW_TYPES[key[:1]].unpack_item(key, value) + except KeyError: + return key, value diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 68a000b1f..232565cbf 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -104,6 +104,9 @@ class RevertableOpStack: for op in ops: self.append(op) + def clear(self): + self._items.clear() + def __len__(self): return sum(map(len, self._items.values()))