diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 336b1972d..2075eae23 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -191,22 +191,21 @@ class BlockProcessor: else: self.ledger = RegTestLedger + self._caught_up_event: Optional[asyncio.Event] = None + self.height = 0 + self.tip = bytes.fromhex(self.coin.GENESIS_HASH)[::-1] + self.tx_count = 0 + self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.logger = class_logger(__name__, self.__class__.__name__) self.executor = ThreadPoolExecutor(1) # Meta - self.next_cache_check = 0 - self.touched = set() - - # Caches of unflushed items. - self.block_txs = [] - self.undo_infos = [] + self.touched_hashXs: Set[bytes] = set() # UTXO cache self.utxo_cache: Dict[Tuple[bytes, int], bytes] = {} - self.db_deletes = [] # Claimtrie cache self.db_op_stack: Optional[RevertableOpStack] = None @@ -324,8 +323,8 @@ class BlockProcessor: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) if self._caught_up_event.is_set(): - await self.mempool.on_block(self.touched, self.height) - self.touched.clear() + await self.mempool.on_block(self.touched_hashXs, self.height) + self.touched_hashXs.clear() elif hprevs[0] != chain[0]: min_start_height = max(self.height - self.coin.REORG_LIMIT, 0) count = 1 @@ -372,7 +371,7 @@ class BlockProcessor: def flush_data(self): """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.block_txs, self.db_op_stack, self.tip) + return FlushData(self.height, self.tx_count, self.db_op_stack, self.tip) async def flush(self): def flush(): @@ -1303,7 +1302,7 @@ class BlockProcessor: self.height -= 1 # self.touched can include other addresses which is # harmless, but remove None. - self.touched.discard(None) + self.touched_hashXs.discard(None) self.db.flush_backup(self.flush_data()) self.clear_after_advance_or_reorg() self.logger.info(f'backed up to height {self.height:,d}') @@ -1311,7 +1310,7 @@ class BlockProcessor: def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]: hashX = self.coin.hashX_from_script(txout.pk_script) if hashX: - self.touched.add(hashX) + self.touched_hashXs.add(hashX) self.utxo_cache[(tx_hash, nout)] = hashX self.db_op_stack.extend([ RevertablePut( @@ -1348,7 +1347,7 @@ class BlockProcessor: ) raise ChainError(f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}") # Remove both entries for this UTXO - self.touched.add(hashX) + self.touched_hashXs.add(hashX) self.db_op_stack.extend([ RevertableDelete(hdb_key, hashX), RevertableDelete(udb_key, utxo_value_packed) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 49ed95a1d..1225e40c3 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -65,7 +65,6 @@ TXO_STRUCT_pack = TXO_STRUCT.pack class FlushData: height = attr.ib() tx_count = attr.ib() - block_txs = attr.ib() put_and_delete_ops = attr.ib() tip = attr.ib() @@ -380,7 +379,9 @@ class LevelDB: ) def get_claim_txo_amount(self, claim_hash: bytes) -> Optional[int]: - v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash)) + claim = self.get_claim_txo(claim_hash) + if claim: + return claim.amount def get_block_hash(self, height: int) -> Optional[bytes]: v = self.db.get(Prefixes.block_hash.pack_key(height))