Merge pull request #3120 from lbryio/fix-reorg-crash
Fix off by one in wallet server reorg
This commit is contained in:
commit
09ee104b8c
1 changed files with 44 additions and 22 deletions
|
@ -248,7 +248,7 @@ class BlockProcessor:
|
||||||
self.logger.info('chain reorg detected')
|
self.logger.info('chain reorg detected')
|
||||||
else:
|
else:
|
||||||
self.logger.info(f'faking a reorg of {count:,d} blocks')
|
self.logger.info(f'faking a reorg of {count:,d} blocks')
|
||||||
await self.flush(True)
|
|
||||||
|
|
||||||
async def get_raw_blocks(last_height, hex_hashes):
|
async def get_raw_blocks(last_height, hex_hashes):
|
||||||
heights = range(last_height, last_height - len(hex_hashes), -1)
|
heights = range(last_height, last_height - len(hex_hashes), -1)
|
||||||
|
@ -265,17 +265,28 @@ class BlockProcessor:
|
||||||
self.touched.discard(None)
|
self.touched.discard(None)
|
||||||
self.db.flush_backup(self.flush_data(), self.touched)
|
self.db.flush_backup(self.flush_data(), self.touched)
|
||||||
|
|
||||||
start, last, hashes = await self.reorg_hashes(count)
|
try:
|
||||||
# Reverse and convert to hex strings.
|
await self.flush(True)
|
||||||
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
|
|
||||||
for hex_hashes in chunks(hashes, 50):
|
start, last, hashes = await self.reorg_hashes(count)
|
||||||
raw_blocks = await get_raw_blocks(last, hex_hashes)
|
# Reverse and convert to hex strings.
|
||||||
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
|
||||||
await self.run_in_thread_with_lock(flush_backup)
|
self.logger.info("reorg %i block hashes", len(hashes))
|
||||||
last -= len(raw_blocks)
|
for hex_hashes in chunks(hashes, 50):
|
||||||
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
|
raw_blocks = await get_raw_blocks(last, hex_hashes)
|
||||||
await self.prefetcher.reset_height(self.height)
|
self.logger.info("got %i raw blocks", len(raw_blocks))
|
||||||
self.reorg_count_metric.inc()
|
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
||||||
|
await self.run_in_thread_with_lock(flush_backup)
|
||||||
|
last -= len(raw_blocks)
|
||||||
|
|
||||||
|
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
|
||||||
|
await self.prefetcher.reset_height(self.height)
|
||||||
|
self.reorg_count_metric.inc()
|
||||||
|
except:
|
||||||
|
self.logger.exception("boom")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self.logger.info("done with reorg")
|
||||||
|
|
||||||
async def reorg_hashes(self, count):
|
async def reorg_hashes(self, count):
|
||||||
"""Return a pair (start, last, hashes) of blocks to back up during a
|
"""Return a pair (start, last, hashes) of blocks to back up during a
|
||||||
|
@ -465,6 +476,7 @@ class BlockProcessor:
|
||||||
|
|
||||||
coin = self.coin
|
coin = self.coin
|
||||||
for raw_block in raw_blocks:
|
for raw_block in raw_blocks:
|
||||||
|
self.logger.info("backup block %i", self.height)
|
||||||
# Check and update self.tip
|
# Check and update self.tip
|
||||||
block = coin.block(raw_block, self.height)
|
block = coin.block(raw_block, self.height)
|
||||||
header_hash = coin.header_hash(block.header)
|
header_hash = coin.header_hash(block.header)
|
||||||
|
@ -493,7 +505,6 @@ class BlockProcessor:
|
||||||
undo_entry_len = 12 + HASHX_LEN
|
undo_entry_len = 12 + HASHX_LEN
|
||||||
|
|
||||||
for tx, tx_hash in reversed(txs):
|
for tx, tx_hash in reversed(txs):
|
||||||
self.db.total_transactions.pop()
|
|
||||||
for idx, txout in enumerate(tx.outputs):
|
for idx, txout in enumerate(tx.outputs):
|
||||||
# Spend the TX outputs. Be careful with unspendable
|
# Spend the TX outputs. Be careful with unspendable
|
||||||
# outputs - we didn't save those in the first place.
|
# outputs - we didn't save those in the first place.
|
||||||
|
@ -511,6 +522,8 @@ class BlockProcessor:
|
||||||
self.utxo_cache[txin.prev_hash + s_pack('<H', txin.prev_idx)] = undo_item
|
self.utxo_cache[txin.prev_hash + s_pack('<H', txin.prev_idx)] = undo_item
|
||||||
self.touched.add(undo_item[:-12])
|
self.touched.add(undo_item[:-12])
|
||||||
|
|
||||||
|
self.db.total_transactions.pop()
|
||||||
|
|
||||||
assert n == 0
|
assert n == 0
|
||||||
self.tx_count -= len(txs)
|
self.tx_count -= len(txs)
|
||||||
|
|
||||||
|
@ -588,13 +601,17 @@ class BlockProcessor:
|
||||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||||
candidates = {db_key: hashX for db_key, hashX
|
candidates = {db_key: hashX for db_key, hashX
|
||||||
in self.db.utxo_db.iterator(prefix=prefix)}
|
in self.db.utxo_db.iterator(prefix=prefix)}
|
||||||
|
|
||||||
for hdb_key, hashX in candidates.items():
|
for hdb_key, hashX in candidates.items():
|
||||||
tx_num_packed = hdb_key[-4:]
|
tx_num_packed = hdb_key[-4:]
|
||||||
|
|
||||||
if len(candidates) > 1:
|
if len(candidates) > 1:
|
||||||
|
|
||||||
tx_num, = unpack('<I', tx_num_packed)
|
tx_num, = unpack('<I', tx_num_packed)
|
||||||
hash, height = self.db.fs_tx_hash(tx_num)
|
try:
|
||||||
|
hash, height = self.db.fs_tx_hash(tx_num)
|
||||||
|
except IndexError:
|
||||||
|
self.logger.error("data integrity error for hashx history: %s missing tx #%s (%s:%s)",
|
||||||
|
hashX.hex(), tx_num, hash_to_hex_str(tx_hash), tx_idx)
|
||||||
|
continue
|
||||||
if hash != tx_hash:
|
if hash != tx_hash:
|
||||||
assert hash is not None # Should always be found
|
assert hash is not None # Should always be found
|
||||||
continue
|
continue
|
||||||
|
@ -603,12 +620,17 @@ class BlockProcessor:
|
||||||
# Value: the UTXO value as a 64-bit unsigned integer
|
# Value: the UTXO value as a 64-bit unsigned integer
|
||||||
udb_key = b'u' + hashX + hdb_key[-6:]
|
udb_key = b'u' + hashX + hdb_key[-6:]
|
||||||
utxo_value_packed = self.db.utxo_db.get(udb_key)
|
utxo_value_packed = self.db.utxo_db.get(udb_key)
|
||||||
if utxo_value_packed:
|
if utxo_value_packed is None:
|
||||||
# Remove both entries for this UTXO
|
self.logger.warning(
|
||||||
self.db_deletes.append(hdb_key)
|
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX)
|
||||||
self.db_deletes.append(udb_key)
|
)
|
||||||
return hashX + tx_num_packed + utxo_value_packed
|
raise ChainError(f"{hash_to_hex_str(tx_hash)}:{tx_idx} is not found in UTXO db for {hash_to_hex_str(hashX)}")
|
||||||
|
# Remove both entries for this UTXO
|
||||||
|
self.db_deletes.append(hdb_key)
|
||||||
|
self.db_deletes.append(udb_key)
|
||||||
|
return hashX + tx_num_packed + utxo_value_packed
|
||||||
|
|
||||||
|
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')
|
||||||
raise ChainError('UTXO {} / {:,d} not found in "h" table'
|
raise ChainError('UTXO {} / {:,d} not found in "h" table'
|
||||||
.format(hash_to_hex_str(tx_hash), tx_idx))
|
.format(hash_to_hex_str(tx_hash), tx_idx))
|
||||||
|
|
||||||
|
@ -621,7 +643,7 @@ class BlockProcessor:
|
||||||
self._caught_up_event.set()
|
self._caught_up_event.set()
|
||||||
await self.blocks_event.wait()
|
await self.blocks_event.wait()
|
||||||
self.blocks_event.clear()
|
self.blocks_event.clear()
|
||||||
if self.reorg_count:
|
if self.reorg_count: # this could only happen by calling the reorg rpc
|
||||||
await self.reorg_chain(self.reorg_count)
|
await self.reorg_chain(self.reorg_count)
|
||||||
self.reorg_count = 0
|
self.reorg_count = 0
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in a new issue