diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 90d8cc039..13bc70e71 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -275,6 +275,7 @@ class BlockProcessor: await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) await self.prefetcher.reset_height(self.height) self.reorg_count_metric.inc() + await reopen_rocksdb_ctx(self.db.executor) async def reorg_hashes(self, count): """Return a pair (start, last, hashes) of blocks to back up during a @@ -289,7 +290,7 @@ class BlockProcessor: self.logger.info(f'chain was reorganised replacing {count:,d} ' f'block{s} at heights {start:,d}-{last:,d}') - return start, last, await self.db.fs_block_hashes(start, count) + return start, last, self.db.fs_block_hashes(start, count) async def calc_reorg_range(self, count: Optional[int]): """Calculate the reorg range""" @@ -307,7 +308,7 @@ class BlockProcessor: start = self.height - 1 count = 1 while start > 0: - hashes = await self.db.fs_block_hashes(start, count) + hashes = self.db.fs_block_hashes(start, count) hex_hashes = [hash_to_hex_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = diff_pos(hex_hashes, d_hex_hashes) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 35ae51a83..1ba902c41 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -226,6 +226,11 @@ def read_headers(): ] +def read_block_file(path): + with util.open_file(path) as f: + return f.read(-1) + + def tx_hash(self, tx_num): """Return a par (tx_hash, tx_height) for the given tx number. @@ -773,8 +778,9 @@ class LevelDB: self.executor, transaction_info_get_batch, txids ) - async def fs_block_hashes(self, height, count): + def fs_block_hashes(self, height, count): if height + count > len(self.headers): + print("boom") raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') return [self.coin.header_hash(header) for header in self.headers[height:height + count]] @@ -823,11 +829,7 @@ class LevelDB: """Returns a raw block read from disk. Raises FileNotFoundError if the block isn't on-disk.""" - def read(): - with util.open_file(self.raw_block_path(height)) as f: - return f.read(-1) - - return await asyncio.get_event_loop().run_in_executor(self.executor, read) + return await asyncio.get_event_loop().run_in_executor(self.executor, read_block_file, self.raw_block_path(height)) def write_raw_block(self, block, height): """Write a raw block to disk.""" diff --git a/lbry/wallet/server/merkle.py b/lbry/wallet/server/merkle.py index 8cf1ca08b..02b9b3c3b 100644 --- a/lbry/wallet/server/merkle.py +++ b/lbry/wallet/server/merkle.py @@ -196,7 +196,7 @@ class MerkleCache: # Start from the beginning of any final partial segment. # Retain the value of depth_higher; in practice this is fine start = self._leaf_start(self.length) - hashes = await self.source_func(start, length - start) + hashes = self.source_func(start, length - start) self.level[start >> self.depth_higher:] = self._level(hashes) self.length = length @@ -208,7 +208,7 @@ class MerkleCache: level = self.level[:length >> self.depth_higher] leaf_start = self._leaf_start(length) count = min(self._segment_length(), length - leaf_start) - hashes = await self.source_func(leaf_start, count) + hashes = self.source_func(leaf_start, count) level += self._level(hashes) return level @@ -216,7 +216,7 @@ class MerkleCache: """Call to initialize the cache to a source of given length.""" self.length = length self.depth_higher = self.merkle.tree_depth(length) // 2 - self.level = self._level(await self.source_func(0, length)) + self.level = self._level(self.source_func(0, length)) self.initialized.set() def truncate(self, length): @@ -250,7 +250,7 @@ class MerkleCache: await self._extend_to(length) leaf_start = self._leaf_start(index) count = min(self._segment_length(), length - leaf_start) - leaf_hashes = await self.source_func(leaf_start, count) + leaf_hashes = self.source_func(leaf_start, count) if length < self._segment_length(): return self.merkle.branch_and_root(leaf_hashes, index) level = await self._level_for(length) diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 5ae8786be..d7df87b3b 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -2,6 +2,15 @@ import logging import asyncio from binascii import hexlify from lbry.testcase import CommandTestCase +from lbry.wallet.server.leveldb import proc_ctx + + +def get_txids(height): + ctx = proc_ctx.get() + return [ + ctx.ctx_tx_hash(tx_num)[0][::-1].hex() + for tx_num in range(ctx.tx_counts[height - 1], ctx.tx_counts[height]) + ] class BlockchainReorganizationTests(CommandTestCase): @@ -11,18 +20,12 @@ class BlockchainReorganizationTests(CommandTestCase): async def assertBlockHash(self, height): bp = self.conductor.spv_node.server.bp - def get_txids(): - return [ - bp.db.fs_tx_hash(tx_num)[0][::-1].hex() - for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height]) - ] - block_hash = await self.blockchain.get_block_hash(height) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) - self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) + self.assertEqual(block_hash, (bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) - txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids) + txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids, height) txs = await bp.db.fs_transactions(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')