Merge branch 'fix-reorg'

This commit is contained in:
Jack Robison 2020-11-20 13:32:59 -05:00
commit f6b396ae64
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 77 additions and 40 deletions

View file

@ -37,7 +37,9 @@ class Env:
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.db_dir = self.required('DB_DIRECTORY') self.db_dir = self.required('DB_DIRECTORY')
self.db_engine = self.default('DB_ENGINE', 'leveldb') self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.trending_algorithms = list(set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' '))) self.trending_algorithms = [
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
]
self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None) self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None)
self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True) self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True)
self.track_metrics = self.boolean('TRACK_METRICS', False) self.track_metrics = self.boolean('TRACK_METRICS', False)

View file

@ -14,11 +14,11 @@ import array
import ast import ast
import os import os
import time import time
import zlib
import base64
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from functools import partial
from binascii import unhexlify, hexlify
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -279,32 +279,38 @@ class LevelDB:
b''.join(hashes for hashes, _ in flush_data.block_txs) b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count ) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers, tx counts, and tx hashes # Write the headers
start_time = time.perf_counter() start_time = time.perf_counter()
with self.headers_db.write_batch() as batch:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
flush_data.headers.clear()
height_start = self.fs_height + 1 height_start = self.fs_height + 1
tx_num = prior_tx_count tx_num = prior_tx_count
for header, block_hash, (tx_hashes, txs) in zip( with self.tx_db.write_batch() as batch:
flush_data.headers, flush_data.block_hashes, flush_data.block_txs): batch_put = batch.put
tx_count = self.tx_counts[height_start] for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) tx_count = self.tx_counts[height_start]
self.tx_db.put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1 height_start += 1
offset = 0 offset = 0
while offset < len(tx_hashes): while offset < len(tx_hashes):
self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
self.tx_db.put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num)) batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num))
self.tx_db.put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32]) batch_put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32])
tx_num += 1 tx_num += 1
offset += 32 offset += 32
flush_data.block_txs.clear() flush_data.block_txs.clear()
flush_data.block_hashes.clear() flush_data.block_hashes.clear()
self.fs_height = flush_data.height self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count self.fs_tx_count = flush_data.tx_count
flush_data.headers.clear()
elapsed = time.perf_counter() - start_time elapsed = time.perf_counter() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
@ -398,7 +404,7 @@ class LevelDB:
raise IndexError(f'height {height:,d} out of range') raise IndexError(f'height {height:,d} out of range')
return header return header
async def read_headers(self, start_height, count): async def read_headers(self, start_height, count, b16=False, b64=False):
"""Requires start_height >= 0, count >= 0. Reads as many headers as """Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This are available starting at start_height up to count. This
would be zero if start_height is beyond self.db_height, for would be zero if start_height is beyond self.db_height, for
@ -407,6 +413,7 @@ class LevelDB:
Returns a (binary, n) pair where binary is the concatenated Returns a (binary, n) pair where binary is the concatenated
binary headers, and n is the count of headers returned. binary headers, and n is the count of headers returned.
""" """
if start_height < 0 or count < 0: if start_height < 0 or count < 0:
raise self.DBError(f'{count:,d} headers starting at ' raise self.DBError(f'{count:,d} headers starting at '
f'{start_height:,d} not on disk') f'{start_height:,d} not on disk')
@ -415,13 +422,19 @@ class LevelDB:
# Read some from disk # Read some from disk
disk_count = max(0, min(count, self.db_height + 1 - start_height)) disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count: if disk_count:
return b''.join( headers = b''.join(
self.headers_db.iterator( self.headers_db.iterator(
start=HEADER_PREFIX + util.pack_be_uint64(start_height), start=HEADER_PREFIX + util.pack_be_uint64(start_height),
stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count), stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count),
include_key=False include_key=False
) )
), disk_count )
if b16:
return headers.hex().encode(), disk_count
elif b64:
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
return base64.b64encode(compressobj.compress(headers) + compressobj.flush()), disk_count
return headers, disk_count
return b'', 0 return b'', 0
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
@ -464,10 +477,9 @@ class LevelDB:
block_txs[tx_height] = list(tx_iterator( block_txs[tx_height] = list(tx_iterator(
start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]), start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height]), TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height] + 1), include_key=False
include_key=False
)) ))
tx_pos = tx_counts[tx_height] - tx_num tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = branch_and_root(block_txs[tx_height], tx_pos) branch, root = branch_and_root(block_txs[tx_height], tx_pos)
merkle = { merkle = {
'block_height': tx_height, 'block_height': tx_height,

View file

@ -1370,11 +1370,9 @@ class LBRYElectrumX(SessionBase):
max_size = self.MAX_CHUNK_SIZE max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size) count = min(count, max_size)
headers, count = await self.db.read_headers(start_height, count) headers, count = await self.db.read_headers(start_height, count, b16=not b64, b64=b64)
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
headers = base64.b64encode(compressobj.compress(headers) + compressobj.flush()).decode() if b64 else headers.hex()
result = { result = {
'base64' if b64 else 'hex': headers, 'base64' if b64 else 'hex': headers.decode(),
'count': count, 'count': count,
'max': max_size 'max': max_size
} }

View file

@ -9,17 +9,32 @@ class BlockchainReorganizationTests(CommandTestCase):
VERBOSITY = logging.WARN VERBOSITY = logging.WARN
async def assertBlockHash(self, height): async def assertBlockHash(self, height):
self.assertEqual( bp = self.conductor.spv_node.server.bp
(await self.ledger.headers.hash(height)).decode(),
await self.blockchain.get_block_hash(height) def get_txids():
) return [
bp.db.fs_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
]
block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids)
txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
async def test_reorg(self): async def test_reorg(self):
bp = self.conductor.spv_node.server.bp bp = self.conductor.spv_node.server.bp
bp.reorg_count_metric.set(0) bp.reorg_count_metric.set(0)
# invalidate current block, move forward 2 # invalidate current block, move forward 2
self.assertEqual(self.ledger.headers.height, 206) height = 206
await self.assertBlockHash(206) self.assertEqual(self.ledger.headers.height, height)
await self.assertBlockHash(height)
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(2) await self.blockchain.generate(2)
await self.ledger.on_header.where(lambda e: e.height == 207) await self.ledger.on_header.where(lambda e: e.height == 207)
@ -37,6 +52,11 @@ class BlockchainReorganizationTests(CommandTestCase):
await self.assertBlockHash(207) await self.assertBlockHash(207)
await self.assertBlockHash(208) await self.assertBlockHash(208)
self.assertEqual(2, bp.reorg_count_metric._samples()[0][2]) self.assertEqual(2, bp.reorg_count_metric._samples()[0][2])
await self.blockchain.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 211)
await self.assertBlockHash(209)
await self.assertBlockHash(210)
await self.assertBlockHash(211)
async def test_reorg_change_claim_height(self): async def test_reorg_change_claim_height(self):
# sanity check # sanity check
@ -51,6 +71,8 @@ class BlockchainReorganizationTests(CommandTestCase):
# create a claim and verify it's returned by claim_search # create a claim and verify it's returned by claim_search
self.assertEqual(self.ledger.headers.height, 207) self.assertEqual(self.ledger.headers.height, 207)
await self.assertBlockHash(207)
broadcast_tx = await self.daemon.jsonrpc_stream_create( broadcast_tx = await self.daemon.jsonrpc_stream_create(
'hovercraft', '1.0', file_path=self.create_upload_file(data=b'hi!') 'hovercraft', '1.0', file_path=self.create_upload_file(data=b'hi!')
) )
@ -58,6 +80,8 @@ class BlockchainReorganizationTests(CommandTestCase):
await self.generate(1) await self.generate(1)
await self.ledger.wait(broadcast_tx, self.blockchain.block_expected) await self.ledger.wait(broadcast_tx, self.blockchain.block_expected)
self.assertEqual(self.ledger.headers.height, 208) self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(208)
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft') txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertEqual(1, len(txos)) self.assertEqual(1, len(txos))
txo = txos[0] txo = txos[0]
@ -75,17 +99,18 @@ class BlockchainReorganizationTests(CommandTestCase):
await self.blockchain.clear_mempool() await self.blockchain.clear_mempool()
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
await self.assertBlockHash(209)
# verify the claim was dropped from block 208 as returned by lbrycrdd # verify the claim was dropped from block 208 as returned by lbrycrdd
reorg_block_hash = await self.blockchain.get_block_hash(208) reorg_block_hash = await self.blockchain.get_block_hash(208)
self.assertNotEqual(invalidated_block_hash, reorg_block_hash) self.assertNotEqual(invalidated_block_hash, reorg_block_hash)
block_207 = await self.blockchain.get_block(reorg_block_hash) block_207 = await self.blockchain.get_block(reorg_block_hash)
self.assertNotIn(txo.tx_ref.id, block_207['tx']) self.assertNotIn(txo.tx_ref.id, block_207['tx'])
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
await self.assertBlockHash(209)
client_reorg_block_hash = (await self.ledger.headers.hash(208)).decode() client_reorg_block_hash = (await self.ledger.headers.hash(208)).decode()
self.assertEqual(client_reorg_block_hash, reorg_block_hash) self.assertEqual(client_reorg_block_hash, reorg_block_hash)