From ee39e20e6dd3a9810d7e89bf5c25e07736ac3250 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 16 Dec 2020 01:26:19 -0500 Subject: [PATCH 1/3] logging --- lbry/wallet/server/block_processor.py | 51 ++++++++++++++++++--------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e7076a5a7..646de23c4 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -248,7 +248,7 @@ class BlockProcessor: self.logger.info('chain reorg detected') else: self.logger.info(f'faking a reorg of {count:,d} blocks') - await self.flush(True) + async def get_raw_blocks(last_height, hex_hashes): heights = range(last_height, last_height - len(hex_hashes), -1) @@ -265,17 +265,28 @@ class BlockProcessor: self.touched.discard(None) self.db.flush_backup(self.flush_data(), self.touched) - start, last, hashes = await self.reorg_hashes(count) - # Reverse and convert to hex strings. - hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] - for hex_hashes in chunks(hashes, 50): - raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.run_in_thread_with_lock(flush_backup) - last -= len(raw_blocks) - await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) - await self.prefetcher.reset_height(self.height) - self.reorg_count_metric.inc() + try: + await self.flush(True) + + start, last, hashes = await self.reorg_hashes(count) + # Reverse and convert to hex strings. + hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] + self.logger.info("reorg %i block hashes", len(hashes)) + for hex_hashes in chunks(hashes, 50): + raw_blocks = await get_raw_blocks(last, hex_hashes) + self.logger.info("got %i raw blocks", len(raw_blocks)) + await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) + await self.run_in_thread_with_lock(flush_backup) + last -= len(raw_blocks) + + await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) + await self.prefetcher.reset_height(self.height) + self.reorg_count_metric.inc() + except: + self.logger.exception("boom") + raise + finally: + self.logger.info("done with reorg") async def reorg_hashes(self, count): """Return a pair (start, last, hashes) of blocks to back up during a @@ -465,6 +476,7 @@ class BlockProcessor: coin = self.coin for raw_block in raw_blocks: + self.logger.info("backup block %i", self.height) # Check and update self.tip block = coin.block(raw_block, self.height) header_hash = coin.header_hash(block.header) @@ -603,12 +615,17 @@ class BlockProcessor: # Value: the UTXO value as a 64-bit unsigned integer udb_key = b'u' + hashX + hdb_key[-6:] utxo_value_packed = self.db.utxo_db.get(udb_key) - if utxo_value_packed: - # Remove both entries for this UTXO - self.db_deletes.append(hdb_key) - self.db_deletes.append(udb_key) - return hashX + tx_num_packed + utxo_value_packed + if utxo_value_packed is None: + self.logger.warning( + "%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX) + ) + raise ChainError(f"{hash_to_hex_str(tx_hash)}:{tx_idx} is not found in UTXO db for {hash_to_hex_str(hashX)}") + # Remove both entries for this UTXO + self.db_deletes.append(hdb_key) + self.db_deletes.append(udb_key) + return hashX + tx_num_packed + utxo_value_packed + self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table') raise ChainError('UTXO {} / {:,d} not found in "h" table' .format(hash_to_hex_str(tx_hash), tx_idx)) From 9bf72910a40d0ecb4deedba35d12534f65cfaf84 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 16 Dec 2020 01:27:03 -0500 Subject: [PATCH 2/3] fix off by one error in reorg --- lbry/wallet/server/block_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 646de23c4..aab99452f 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -505,7 +505,6 @@ class BlockProcessor: undo_entry_len = 12 + HASHX_LEN for tx, tx_hash in reversed(txs): - self.db.total_transactions.pop() for idx, txout in enumerate(tx.outputs): # Spend the TX outputs. Be careful with unspendable # outputs - we didn't save those in the first place. @@ -523,6 +522,8 @@ class BlockProcessor: self.utxo_cache[txin.prev_hash + s_pack(' Date: Wed, 16 Dec 2020 01:28:30 -0500 Subject: [PATCH 3/3] catch any remaining index errors in fs_tx_hash --- lbry/wallet/server/block_processor.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index aab99452f..df40d7ea6 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -601,13 +601,17 @@ class BlockProcessor: prefix = b'h' + tx_hash[:4] + idx_packed candidates = {db_key: hashX for db_key, hashX in self.db.utxo_db.iterator(prefix=prefix)} - for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] - if len(candidates) > 1: + tx_num, = unpack('