diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 173137257..f2deb2728 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -37,7 +37,9 @@ class Env: self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.db_dir = self.required('DB_DIRECTORY') self.db_engine = self.default('DB_ENGINE', 'leveldb') - self.trending_algorithms = list(set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' '))) + self.trending_algorithms = [ + trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending + ] self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None) self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True) self.track_metrics = self.boolean('TRACK_METRICS', False) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 437240072..5eddb6c54 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -14,11 +14,11 @@ import array import ast import os import time +import zlib +import base64 from asyncio import sleep from bisect import bisect_right from collections import namedtuple -from functools import partial -from binascii import unhexlify, hexlify from glob import glob from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor @@ -279,32 +279,38 @@ class LevelDB: b''.join(hashes for hashes, _ in flush_data.block_txs) ) // 32 == flush_data.tx_count - prior_tx_count - # Write the headers, tx counts, and tx hashes + # Write the headers start_time = time.perf_counter() + + with self.headers_db.write_batch() as batch: + batch_put = batch.put + for i, header in enumerate(flush_data.headers): + batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header) + flush_data.headers.clear() + height_start = self.fs_height + 1 tx_num = prior_tx_count - for header, block_hash, (tx_hashes, txs) in zip( - flush_data.headers, flush_data.block_hashes, flush_data.block_txs): - tx_count = self.tx_counts[height_start] - self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) - self.tx_db.put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) - self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) - height_start += 1 - offset = 0 - while offset < len(tx_hashes): - self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) - self.tx_db.put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num)) - self.tx_db.put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32]) - tx_num += 1 - offset += 32 + with self.tx_db.write_batch() as batch: + batch_put = batch.put + for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs): + tx_count = self.tx_counts[height_start] + batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) + batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) + height_start += 1 + offset = 0 + while offset < len(tx_hashes): + batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num)) + batch_put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32]) + tx_num += 1 + offset += 32 flush_data.block_txs.clear() flush_data.block_hashes.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count - flush_data.headers.clear() elapsed = time.perf_counter() - start_time self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') @@ -398,7 +404,7 @@ class LevelDB: raise IndexError(f'height {height:,d} out of range') return header - async def read_headers(self, start_height, count): + async def read_headers(self, start_height, count, b16=False, b64=False): """Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This would be zero if start_height is beyond self.db_height, for @@ -407,6 +413,7 @@ class LevelDB: Returns a (binary, n) pair where binary is the concatenated binary headers, and n is the count of headers returned. """ + if start_height < 0 or count < 0: raise self.DBError(f'{count:,d} headers starting at ' f'{start_height:,d} not on disk') @@ -415,13 +422,19 @@ class LevelDB: # Read some from disk disk_count = max(0, min(count, self.db_height + 1 - start_height)) if disk_count: - return b''.join( + headers = b''.join( self.headers_db.iterator( start=HEADER_PREFIX + util.pack_be_uint64(start_height), stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count), include_key=False ) - ), disk_count + ) + if b16: + return headers.hex().encode(), disk_count + elif b64: + compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9) + return base64.b64encode(compressobj.compress(headers) + compressobj.flush()), disk_count + return headers, disk_count return b'', 0 return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) @@ -464,10 +477,9 @@ class LevelDB: block_txs[tx_height] = list(tx_iterator( start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]), stop=None if tx_height + 1 == len(tx_counts) else - TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height]), - include_key=False + TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height] + 1), include_key=False )) - tx_pos = tx_counts[tx_height] - tx_num + tx_pos = tx_num - tx_counts[tx_height - 1] branch, root = branch_and_root(block_txs[tx_height], tx_pos) merkle = { 'block_height': tx_height, diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 2e4c5d0e6..c5f79b116 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1370,11 +1370,9 @@ class LBRYElectrumX(SessionBase): max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) - headers, count = await self.db.read_headers(start_height, count) - compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9) - headers = base64.b64encode(compressobj.compress(headers) + compressobj.flush()).decode() if b64 else headers.hex() + headers, count = await self.db.read_headers(start_height, count, b16=not b64, b64=b64) result = { - 'base64' if b64 else 'hex': headers, + 'base64' if b64 else 'hex': headers.decode(), 'count': count, 'max': max_size } diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index af9349e67..5ae8786be 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -9,17 +9,32 @@ class BlockchainReorganizationTests(CommandTestCase): VERBOSITY = logging.WARN async def assertBlockHash(self, height): - self.assertEqual( - (await self.ledger.headers.hash(height)).decode(), - await self.blockchain.get_block_hash(height) - ) + bp = self.conductor.spv_node.server.bp + + def get_txids(): + return [ + bp.db.fs_tx_hash(tx_num)[0][::-1].hex() + for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height]) + ] + + block_hash = await self.blockchain.get_block_hash(height) + + self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) + self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) + + txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids) + txs = await bp.db.fs_transactions(txids) + block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] + self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') + self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') async def test_reorg(self): bp = self.conductor.spv_node.server.bp bp.reorg_count_metric.set(0) # invalidate current block, move forward 2 - self.assertEqual(self.ledger.headers.height, 206) - await self.assertBlockHash(206) + height = 206 + self.assertEqual(self.ledger.headers.height, height) + await self.assertBlockHash(height) await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) await self.blockchain.generate(2) await self.ledger.on_header.where(lambda e: e.height == 207) @@ -37,6 +52,11 @@ class BlockchainReorganizationTests(CommandTestCase): await self.assertBlockHash(207) await self.assertBlockHash(208) self.assertEqual(2, bp.reorg_count_metric._samples()[0][2]) + await self.blockchain.generate(3) + await self.ledger.on_header.where(lambda e: e.height == 211) + await self.assertBlockHash(209) + await self.assertBlockHash(210) + await self.assertBlockHash(211) async def test_reorg_change_claim_height(self): # sanity check @@ -51,6 +71,8 @@ class BlockchainReorganizationTests(CommandTestCase): # create a claim and verify it's returned by claim_search self.assertEqual(self.ledger.headers.height, 207) + await self.assertBlockHash(207) + broadcast_tx = await self.daemon.jsonrpc_stream_create( 'hovercraft', '1.0', file_path=self.create_upload_file(data=b'hi!') ) @@ -58,6 +80,8 @@ class BlockchainReorganizationTests(CommandTestCase): await self.generate(1) await self.ledger.wait(broadcast_tx, self.blockchain.block_expected) self.assertEqual(self.ledger.headers.height, 208) + await self.assertBlockHash(208) + txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft') self.assertEqual(1, len(txos)) txo = txos[0] @@ -75,17 +99,18 @@ class BlockchainReorganizationTests(CommandTestCase): await self.blockchain.clear_mempool() await self.blockchain.generate(2) + # wait for the client to catch up and verify the reorg + await asyncio.wait_for(self.on_header(209), 3.0) + await self.assertBlockHash(207) + await self.assertBlockHash(208) + await self.assertBlockHash(209) + # verify the claim was dropped from block 208 as returned by lbrycrdd reorg_block_hash = await self.blockchain.get_block_hash(208) self.assertNotEqual(invalidated_block_hash, reorg_block_hash) block_207 = await self.blockchain.get_block(reorg_block_hash) self.assertNotIn(txo.tx_ref.id, block_207['tx']) - # wait for the client to catch up and verify the reorg - await asyncio.wait_for(self.on_header(209), 3.0) - await self.assertBlockHash(207) - await self.assertBlockHash(208) - await self.assertBlockHash(209) client_reorg_block_hash = (await self.ledger.headers.hash(208)).decode() self.assertEqual(client_reorg_block_hash, reorg_block_hash)