diff --git a/lbry/blockchain/database.py b/lbry/blockchain/database.py index 9360f1553..3d74fe412 100644 --- a/lbry/blockchain/database.py +++ b/lbry/blockchain/database.py @@ -112,7 +112,8 @@ class BlockchainDB: file as file_number, COUNT(hash) as blocks, SUM(txcount) as txs, - MAX(height) as best_height + MAX(height) as best_height, + MIN(height) as start_height FROM block_info WHERE status&1 AND status&4 """ diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 8683fea7f..73936640b 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -4,7 +4,7 @@ from sqlalchemy import table, bindparam, text, func, union from sqlalchemy.future import select from sqlalchemy.schema import CreateTable -from lbry.db.tables import Block as BlockTable, TX, TXO, TXI +from lbry.db.tables import Block as BlockTable, TX, TXO, TXI, Claim, Support from lbry.db.tables import ( pg_add_tx_constraints_and_indexes, pg_add_txo_constraints_and_indexes, @@ -190,3 +190,17 @@ def get_block_tx_addresses(block_hash=None, tx_hash=None): .where((TXI.c.address.isnot_(None)) & constraint), ) ) + + +@event_emitter("blockchain.sync.rewind.main", "steps") +def rewind(height: int, p: ProgressContext): + deletes = [ + BlockTable.delete().where(BlockTable.c.height >= height), + TXI.delete().where(TXI.c.height >= height), + TXO.delete().where(TXO.c.height >= height), + TX.delete().where(TX.c.height >= height), + Claim.delete().where(Claim.c.height >= height), + Support.delete().where(Support.c.height >= height), + ] + for delete in p.iter(deletes): + p.ctx.execute(delete) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 0da2d035a..003b24efa 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -103,11 +103,13 @@ class BlockchainSync(Sync): ))[0] tx_count += chain_file['txs'] block_count += chain_file['blocks'] + file_start_height = chain_file['start_height'] starting_height = min( - our_best_file_height+1 if starting_height is None else starting_height, our_best_file_height+1 + file_start_height if starting_height is None else starting_height, + file_start_height ) tasks.append(self.db.run( - block_phase.sync_block_file, chain_file['file_number'], our_best_file_height+1, + block_phase.sync_block_file, chain_file['file_number'], file_start_height, chain_file['txs'], self.TX_FLUSH_SIZE )) with Progress(self.db.message_queue, BLOCKS_MAIN_EVENT) as p: @@ -302,3 +304,6 @@ class BlockchainSync(Sync): except Exception as e: log.exception(e) await self.stop() + + async def rewind(self, height): + await self.db.run(block_phase.rewind, height) diff --git a/lbry/testcase.py b/lbry/testcase.py index 11c1ecf24..18e555f2e 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -892,7 +892,7 @@ class EventGenerator: def claims_stakes(self): yield from self.generate( - "blockchain.sync.claims.stakes", ("claims",), 0, None, (self.stakes,), (1,) + "blockchain.sync.claims.stakes", ("claims",), 0, None, (self.stakes,), (self.stakes,) ) def claims_vacuum(self): diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 6e9dc8073..920015943 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -400,6 +400,18 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): funded = await self.chain.fund_raw_transaction(hexlify(tx.raw).decode()) signed = await self.chain.sign_raw_transaction_with_wallet(funded['hex']) await self.chain.send_raw_transaction(signed['hex']) + tx = Transaction(unhexlify(signed['hex'])) + claim = None + for txo in tx.outputs: + if txo.is_claim: + claim = txo + break + support_tx = Transaction().add_outputs([ + Output.pay_support_pubkey_hash(CENT, claim.claim_name, claim.claim_id, address), + ]) + funded = await self.chain.fund_raw_transaction(hexlify(support_tx.raw).decode()) + signed = await self.chain.sign_raw_transaction_with_wallet(funded['hex']) + await self.chain.send_raw_transaction(signed['hex']) await self.chain.generate(1) # supports \w data aren't supported until block 350, fast forward a little @@ -430,12 +442,12 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): # get_block_files self.assertEqual( - [(0, 191, 280), (1, 89, 178), (2, 73, 86)], + [(0, 191, 369), (1, 89, 267), (2, 73, 98)], [(file['file_number'], file['blocks'], file['txs']) for file in await db.get_block_files()] ) self.assertEqual( - [(1, 29, 58)], + [(1, 29, 87)], [(file['file_number'], file['blocks'], file['txs']) for file in await db.get_block_files(1, 251)] ) @@ -465,7 +477,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): self.assertEqual(0, await db.get_claim_metadata_count(500, 1000)) # get_support_metadata_count - self.assertEqual(2, await db.get_support_metadata_count(0, 500)) + self.assertEqual(192, await db.get_support_metadata_count(0, 500)) self.assertEqual(0, await db.get_support_metadata_count(500, 1000)) # get_support_metadata @@ -473,7 +485,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): [{'name': b'two', 'activation_height': 359, 'expiration_height': 852}, {'name': b'two', 'activation_height': 359, 'expiration_height': 852}], [{'name': c['name'], 'activation_height': c['activation_height'], 'expiration_height': c['expiration_height']} - for c in await db.get_support_metadata(0, 500)] + for c in await db.get_support_metadata(350, 500)] ) @staticmethod @@ -507,9 +519,9 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): initial_sync=True, start=0, end=352, block_files=[ - (0, 191, 280, ((100, 0), (191, 280))), - (1, 89, 178, ((89, 178),)), - (2, 73, 86, ((73, 86),)), + (0, 191, 369, ((100, 0), (191, 369))), + (1, 89, 267, ((89, 267),)), + (2, 73, 98, ((73, 98),)), ], claims=[ (102, 120, 361, 361), @@ -524,7 +536,16 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): (273, 291, 361, 361), ], supports=[ - (352, 352, 2, 2), + (102, 121, 20, 20), + (122, 141, 20, 20), + (142, 160, 19, 19), + (161, 179, 19, 19), + (180, 198, 19, 19), + (199, 217, 19, 19), + (218, 236, 19, 19), + (237, 255, 19, 19), + (256, 274, 19, 19), + (275, 352, 19, 19), ] ).events) ) @@ -566,6 +587,30 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ).events) ) + # test non-initial sync across multiple files + await self.sync.rewind(250) + await asyncio.sleep(1) # give it time to collect events + events.clear() + await self.sync.advance() + await asyncio.sleep(1) # give it time to collect events + self.assertEqual( + self.sorted_events(events), + list(EventGenerator( + initial_sync=False, + start=250, end=354, + block_files=[ + (1, 30, 90, ((30, 90),)), + (2, 75, 102, ((75, 102),)), + ], + claims=[(250, 354, 799, 1084)], + takeovers=[(250, 354, 1, 1)], + stakes=43, + supports=[ + (250, 354, 45, 45), + ] + ).events) + ) + class TestGeneralBlockchainSync(SyncingBlockchainTestCase):