This commit is contained in:
Jack Robison 2020-11-30 12:19:08 -05:00
parent 70e5ce4806
commit a82abb33ac
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 26 additions and 20 deletions

View file

@ -275,6 +275,7 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
await reopen_rocksdb_ctx(self.db.executor)
async def reorg_hashes(self, count):
"""Return a pair (start, last, hashes) of blocks to back up during a
@ -289,7 +290,7 @@ class BlockProcessor:
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, last, await self.db.fs_block_hashes(start, count)
return start, last, self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count: Optional[int]):
"""Calculate the reorg range"""
@ -307,7 +308,7 @@ class BlockProcessor:
start = self.height - 1
count = 1
while start > 0:
hashes = await self.db.fs_block_hashes(start, count)
hashes = self.db.fs_block_hashes(start, count)
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = diff_pos(hex_hashes, d_hex_hashes)

View file

@ -226,6 +226,11 @@ def read_headers():
]
def read_block_file(path):
with util.open_file(path) as f:
return f.read(-1)
def tx_hash(self, tx_num):
"""Return a par (tx_hash, tx_height) for the given tx number.
@ -773,8 +778,9 @@ class LevelDB:
self.executor, transaction_info_get_batch, txids
)
async def fs_block_hashes(self, height, count):
def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
print("boom")
raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
@ -823,11 +829,7 @@ class LevelDB:
"""Returns a raw block read from disk. Raises FileNotFoundError
if the block isn't on-disk."""
def read():
with util.open_file(self.raw_block_path(height)) as f:
return f.read(-1)
return await asyncio.get_event_loop().run_in_executor(self.executor, read)
return await asyncio.get_event_loop().run_in_executor(self.executor, read_block_file, self.raw_block_path(height))
def write_raw_block(self, block, height):
"""Write a raw block to disk."""

View file

@ -196,7 +196,7 @@ class MerkleCache:
# Start from the beginning of any final partial segment.
# Retain the value of depth_higher; in practice this is fine
start = self._leaf_start(self.length)
hashes = await self.source_func(start, length - start)
hashes = self.source_func(start, length - start)
self.level[start >> self.depth_higher:] = self._level(hashes)
self.length = length
@ -208,7 +208,7 @@ class MerkleCache:
level = self.level[:length >> self.depth_higher]
leaf_start = self._leaf_start(length)
count = min(self._segment_length(), length - leaf_start)
hashes = await self.source_func(leaf_start, count)
hashes = self.source_func(leaf_start, count)
level += self._level(hashes)
return level
@ -216,7 +216,7 @@ class MerkleCache:
"""Call to initialize the cache to a source of given length."""
self.length = length
self.depth_higher = self.merkle.tree_depth(length) // 2
self.level = self._level(await self.source_func(0, length))
self.level = self._level(self.source_func(0, length))
self.initialized.set()
def truncate(self, length):
@ -250,7 +250,7 @@ class MerkleCache:
await self._extend_to(length)
leaf_start = self._leaf_start(index)
count = min(self._segment_length(), length - leaf_start)
leaf_hashes = await self.source_func(leaf_start, count)
leaf_hashes = self.source_func(leaf_start, count)
if length < self._segment_length():
return self.merkle.branch_and_root(leaf_hashes, index)
level = await self._level_for(length)

View file

@ -2,6 +2,15 @@ import logging
import asyncio
from binascii import hexlify
from lbry.testcase import CommandTestCase
from lbry.wallet.server.leveldb import proc_ctx
def get_txids(height):
ctx = proc_ctx.get()
return [
ctx.ctx_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(ctx.tx_counts[height - 1], ctx.tx_counts[height])
]
class BlockchainReorganizationTests(CommandTestCase):
@ -11,18 +20,12 @@ class BlockchainReorganizationTests(CommandTestCase):
async def assertBlockHash(self, height):
bp = self.conductor.spv_node.server.bp
def get_txids():
return [
bp.db.fs_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
]
block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
self.assertEqual(block_hash, (bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids)
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids, height)
txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')