Merge pull request #2897 from lbryio/reorg-claims

handle claim reorgs in the wallet server
This commit is contained in:
Jack Robison 2020-04-05 17:48:56 -04:00 committed by GitHub
commit 238707bd93
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 26 deletions

View file

@ -55,7 +55,8 @@ class Conductor:
async def start_blockchain(self):
if not self.blockchain_started:
await self.blockchain_node.start()
asyncio.create_task(self.blockchain_node.start())
await self.blockchain_node.running.wait()
await self.blockchain_node.generate(200)
self.blockchain_started = True
@ -255,6 +256,10 @@ class BlockchainNode:
self.rpcport = 9245 + 2 # avoid conflict with default rpc port
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.stopped = False
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event()
@property
def rpc_url(self):
@ -315,13 +320,27 @@ class BlockchainNode:
f'-port={self.peerport}'
]
self.log.info(' '.join(command))
self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command
)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
while not self.stopped:
if self.running.is_set():
await asyncio.sleep(1)
continue
await self.restart_ready.wait()
try:
self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command
)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
except asyncio.CancelledError:
self.running.clear()
raise
except Exception as e:
self.running.clear()
log.exception('failed to start lbrycrdd', exc_info=e)
async def stop(self, cleanup=True):
self.stopped = True
try:
self.transport.terminate()
await self.protocol.stopped.wait()
@ -330,6 +349,16 @@ class BlockchainNode:
if cleanup:
self.cleanup()
async def clear_mempool(self):
self.restart_ready.clear()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
self.running.clear()
os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
self.restart_ready.set()
await self.running.wait()
def cleanup(self):
shutil.rmtree(self.data_path, ignore_errors=True)
@ -361,6 +390,12 @@ class BlockchainNode:
def get_block_hash(self, block):
return self._cli_cmnd('getblockhash', str(block))
def sendrawtransaction(self, tx):
return self._cli_cmnd('sendrawtransaction', tx)
async def get_block(self, block_hash):
return json.loads(await self._cli_cmnd('getblock', block_hash, '1'))
def get_raw_change_address(self):
return self._cli_cmnd('getrawchangeaddress')

View file

@ -2,7 +2,7 @@ import time
import asyncio
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional
import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.db.writer import SQLDB
@ -10,7 +10,7 @@ from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES
from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES, REORG_COUNT
class Prefetcher:
@ -219,7 +219,7 @@ class BlockProcessor:
'resetting the prefetcher')
await self.prefetcher.reset_height(self.height)
async def reorg_chain(self, count=None):
async def reorg_chain(self, count: Optional[int] = None):
"""Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
@ -253,7 +253,9 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
REORG_COUNT.inc()
async def reorg_hashes(self, count):
"""Return a pair (start, last, hashes) of blocks to back up during a
@ -270,7 +272,7 @@ class BlockProcessor:
return start, last, await self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count):
async def calc_reorg_range(self, count: Optional[int]):
"""Calculate the reorg range"""
def diff_pos(hashes1, hashes2):

View file

@ -433,6 +433,15 @@ class SQLDB:
return {r.channel_hash for r in affected_channels}
return set()
def delete_claims_above_height(self, height: int):
claim_hashes = [x[0] for x in self.execute(
"SELECT claim_hash FROM claim WHERE height>?", (height, )
).fetchall()]
while claim_hashes:
batch = set(claim_hashes[:500])
claim_hashes = claim_hashes[500:]
self.delete_claims(batch)
def _clear_claim_metadata(self, claim_hashes: Set[bytes]):
if claim_hashes:
for table in ('tag',): # 'language', 'location', etc

View file

@ -51,7 +51,9 @@ BLOCK_COUNT = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=NAMESPACE)
REORG_COUNT = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE
)
class PrometheusServer:
def __init__(self):

View file

@ -1,8 +1,11 @@
import logging
from lbry.testcase import IntegrationTestCase
import asyncio
from binascii import hexlify
from lbry.testcase import CommandTestCase
from lbry.wallet.server.prometheus import REORG_COUNT
class BlockchainReorganizationTests(IntegrationTestCase):
class BlockchainReorganizationTests(CommandTestCase):
VERBOSITY = logging.WARN
@ -13,21 +16,105 @@ class BlockchainReorganizationTests(IntegrationTestCase):
)
async def test_reorg(self):
REORG_COUNT.set(0)
# invalidate current block, move forward 2
self.assertEqual(self.ledger.headers.height, 200)
await self.assertBlockHash(200)
await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode())
self.assertEqual(self.ledger.headers.height, 206)
await self.assertBlockHash(206)
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(2)
await self.ledger.on_header.where(lambda e: e.height == 201)
self.assertEqual(self.ledger.headers.height, 201)
await self.assertBlockHash(200)
await self.assertBlockHash(201)
await self.ledger.on_header.where(lambda e: e.height == 207)
self.assertEqual(self.ledger.headers.height, 207)
await self.assertBlockHash(206)
await self.assertBlockHash(207)
self.assertEqual(1, REORG_COUNT._samples()[0][2])
# invalidate current block, move forward 3
await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode())
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 202)
self.assertEqual(self.ledger.headers.height, 202)
await self.assertBlockHash(200)
await self.assertBlockHash(201)
await self.assertBlockHash(202)
await self.ledger.on_header.where(lambda e: e.height == 208)
self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(206)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
self.assertEqual(2, REORG_COUNT._samples()[0][2])
async def test_reorg_change_claim_height(self):
# sanity check
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertListEqual(txos, [])
still_valid = await self.daemon.jsonrpc_stream_create(
'still-valid', '1.0', file_path=self.create_upload_file(data=b'hi!')
)
await self.ledger.wait(still_valid)
await self.generate(1)
# create a claim and verify it's returned by claim_search
self.assertEqual(self.ledger.headers.height, 207)
broadcast_tx = await self.daemon.jsonrpc_stream_create(
'hovercraft', '1.0', file_path=self.create_upload_file(data=b'hi!')
)
await self.ledger.wait(broadcast_tx)
await self.generate(1)
await self.ledger.wait(broadcast_tx, self.blockchain.block_expected)
self.assertEqual(self.ledger.headers.height, 208)
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertEqual(1, len(txos))
txo = txos[0]
self.assertEqual(txo.tx_ref.id, broadcast_tx.id)
self.assertEqual(txo.tx_ref.height, 208)
# check that our tx is in block 208 as returned by lbrycrdd
invalidated_block_hash = (await self.ledger.headers.hash(208)).decode()
block_207 = await self.blockchain.get_block(invalidated_block_hash)
self.assertIn(txo.tx_ref.id, block_207['tx'])
self.assertEqual(208, txos[0].tx_ref.height)
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.blockchain.generate(2)
# verify the claim was dropped from block 208 as returned by lbrycrdd
reorg_block_hash = await self.blockchain.get_block_hash(208)
self.assertNotEqual(invalidated_block_hash, reorg_block_hash)
block_207 = await self.blockchain.get_block(reorg_block_hash)
self.assertNotIn(txo.tx_ref.id, block_207['tx'])
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
await self.assertBlockHash(209)
client_reorg_block_hash = (await self.ledger.headers.hash(208)).decode()
self.assertEqual(client_reorg_block_hash, reorg_block_hash)
# verify the dropped claim is no longer returned by claim search
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertListEqual(txos, [])
# verify the claim published a block earlier wasn't also reverted
txos, _, _, _ = await self.ledger.claim_search([], name='still-valid')
self.assertEqual(1, len(txos))
self.assertEqual(207, txos[0].tx_ref.height)
# broadcast the claim in a different block
new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode())
self.assertEqual(broadcast_tx.id, new_txid)
await self.blockchain.generate(1)
# wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0)
# verify the claim is in the new block and that it is returned by claim_search
block_210 = await self.blockchain.get_block((await self.ledger.headers.hash(210)).decode())
self.assertIn(txo.tx_ref.id, block_210['tx'])
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertEqual(1, len(txos))
self.assertEqual(txos[0].tx_ref.id, new_txid)
self.assertEqual(210, txos[0].tx_ref.height)
# this should still be unchanged
txos, _, _, _ = await self.ledger.claim_search([], name='still-valid')
self.assertEqual(1, len(txos))
self.assertEqual(207, txos[0].tx_ref.height)