diff --git a/lbry/testcase.py b/lbry/testcase.py index b10ea9b27..31e7add4c 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -285,6 +285,17 @@ class IntegrationTestCase(AsyncioTestCase): lambda e: e.tx.id == tx.id and e.address == address ) + async def generate(self, blocks): + """ Ask lbrycrd to generate some blocks and wait until ledger has them. """ + print("generate", blocks) + prepare = self.ledger.on_header.where(self.blockchain.is_expected_block) + self.conductor.spv_node.server.synchronized.clear() + await self.blockchain.generate(blocks) + await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate + print("wait for synchronized") + await self.conductor.spv_node.server.synchronized.wait() + print("finished waiting for synchronized") + class FakeExchangeRateManager(ExchangeRateManager): @@ -456,8 +467,7 @@ class CommandTestCase(IntegrationTestCase): async def confirm_tx(self, txid, ledger=None): """ Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """ await self.on_transaction_id(txid, ledger) - await self.generate(1) - await self.on_transaction_id(txid, ledger) + await asyncio.wait([self.generate(1), self.on_transaction_id(txid, ledger)], timeout=5) return txid async def on_transaction_dict(self, tx): @@ -472,12 +482,6 @@ class CommandTestCase(IntegrationTestCase): addresses.add(txo['address']) return list(addresses) - async def generate(self, blocks): - """ Ask lbrycrd to generate some blocks and wait until ledger has them. """ - prepare = self.ledger.on_header.where(self.blockchain.is_expected_block) - await self.blockchain.generate(blocks) - await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate - async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True): txid = await self.blockchain._cli_cmnd('claimname', name, value, amount) if confirm: diff --git a/lbry/wallet/orchstr8/__init__.py b/lbry/wallet/orchstr8/__init__.py index 72791f2a3..5827bb95e 100644 --- a/lbry/wallet/orchstr8/__init__.py +++ b/lbry/wallet/orchstr8/__init__.py @@ -1,5 +1,5 @@ __hub_url__ = ( "https://github.com/lbryio/hub/releases/download/v0.2022.01.21.1/hub" ) -from .node import Conductor -from .service import ConductorService +from lbry.wallet.orchstr8.node import Conductor +from lbry.wallet.orchstr8.service import ConductorService diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 572331629..9f54dc4a7 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -16,11 +16,13 @@ import urllib.request from uuid import uuid4 import lbry -from lbry.wallet.server.server import Server from lbry.wallet.server.env import Env from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent from lbry.conf import KnownHubsList, Config from lbry.wallet.orchstr8 import __hub_url__ +from lbry.wallet.server.block_processor import BlockProcessor +from lbry.wallet.server.chain_reader import BlockchainReaderServer +from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter log = logging.getLogger(__name__) @@ -189,49 +191,48 @@ class SPVNode: self.coin_class = coin_class self.controller = None self.data_path = None - self.server = None + self.server: Optional[BlockchainReaderServer] = None + self.writer: Optional[BlockProcessor] = None + self.es_writer: Optional[ElasticWriter] = None self.hostname = 'localhost' self.port = 50001 + node_number # avoid conflict with default daemon self.udp_port = self.port self.session_timeout = 600 - self.rpc_port = '0' # disabled by default self.stopped = False self.index_name = uuid4().hex async def start(self, blockchain_node: 'BlockchainNode', extraconf=None): self.data_path = tempfile.mkdtemp() conf = { - 'DESCRIPTION': '', - 'PAYMENT_ADDRESS': '', - 'DAILY_FEE': '0', - 'DB_DIRECTORY': self.data_path, - 'DAEMON_URL': blockchain_node.rpc_url, - 'REORG_LIMIT': '100', - 'HOST': self.hostname, - 'TCP_PORT': str(self.port), - 'UDP_PORT': str(self.udp_port), - 'SESSION_TIMEOUT': str(self.session_timeout), - 'MAX_QUERY_WORKERS': '0', - 'INDIVIDUAL_TAG_INDEXES': '', - 'RPC_PORT': self.rpc_port, - 'ES_INDEX_PREFIX': self.index_name, - 'ES_MODE': 'writer', + 'description': '', + 'payment_address': '', + 'daily_fee': '0', + 'db_dir': self.data_path, + 'daemon_url': blockchain_node.rpc_url, + 'reorg_limit': 100, + 'host': self.hostname, + 'tcp_port': self.port, + 'udp_port': self.udp_port, + 'session_timeout': self.session_timeout, + 'max_query_workers': 0, + 'es_index_prefix': self.index_name, } if extraconf: conf.update(extraconf) - # TODO: don't use os.environ - os.environ.update(conf) - self.server = Server(Env(self.coin_class)) - self.server.bp.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5 - await self.server.start() + self.writer = BlockProcessor(Env(self.coin_class, es_mode='writer', **conf)) + self.server = BlockchainReaderServer(Env(self.coin_class, es_mode='reader', **conf)) + self.es_writer = ElasticWriter(Env(self.coin_class, es_mode='reader', **conf)) + await self.writer.open() + await self.writer.start() + await asyncio.wait([self.server.start(), self.es_writer.start()]) async def stop(self, cleanup=True): if self.stopped: return try: - await self.server.db.search_index.delete_index() - await self.server.db.search_index.stop() + await self.es_writer.stop(delete_index=True) await self.server.stop() + await self.writer.stop() self.stopped = True finally: cleanup and self.cleanup() diff --git a/scripts/initialize_hub_from_snapshot.sh b/scripts/initialize_hub_from_snapshot.sh index b0c09e1bc..5c7ae4e1b 100755 --- a/scripts/initialize_hub_from_snapshot.sh +++ b/scripts/initialize_hub_from_snapshot.sh @@ -1,12 +1,12 @@ #!/bin/bash -SNAPSHOT_HEIGHT="1049658" +SNAPSHOT_HEIGHT="1072108" HUB_VOLUME_PATH="/var/lib/docker/volumes/${USER}_wallet_server" ES_VOLUME_PATH="/var/lib/docker/volumes/${USER}_es01" -SNAPSHOT_TAR_NAME="wallet_server_snapshot_${SNAPSHOT_HEIGHT}.tar" -ES_SNAPSHOT_TAR_NAME="es_snapshot_${SNAPSHOT_HEIGHT}.tar" +SNAPSHOT_TAR_NAME="wallet_server_snapshot_${SNAPSHOT_HEIGHT}.tar.gz" +ES_SNAPSHOT_TAR_NAME="es_snapshot_${SNAPSHOT_HEIGHT}.tar.gz" SNAPSHOT_URL="https://snapshots.lbry.com/hub/${SNAPSHOT_TAR_NAME}" ES_SNAPSHOT_URL="https://snapshots.lbry.com/hub/${ES_SNAPSHOT_TAR_NAME}" diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 621655add..f0959997b 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -36,7 +36,7 @@ class BlockchainReorganizationTests(CommandTestCase): self.assertEqual(self.ledger.headers.height, height) await self.assertBlockHash(height) await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) - await self.blockchain.generate(2) + await self.generate(2) await self.ledger.on_header.where(lambda e: e.height == 207) self.assertEqual(self.ledger.headers.height, 207) await self.assertBlockHash(206) @@ -45,14 +45,14 @@ class BlockchainReorganizationTests(CommandTestCase): # invalidate current block, move forward 3 await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) - await self.blockchain.generate(3) + await self.generate(3) await self.ledger.on_header.where(lambda e: e.height == 208) self.assertEqual(self.ledger.headers.height, 208) await self.assertBlockHash(206) await self.assertBlockHash(207) await self.assertBlockHash(208) self.assertEqual(2, bp.reorg_count_metric._samples()[0][2]) - await self.blockchain.generate(3) + await self.generate(3) await self.ledger.on_header.where(lambda e: e.height == 211) await self.assertBlockHash(209) await self.assertBlockHash(210) @@ -61,7 +61,7 @@ class BlockchainReorganizationTests(CommandTestCase): 'still-valid', '1.0', file_path=self.create_upload_file(data=b'hi!') ) await self.ledger.wait(still_valid) - await self.blockchain.generate(1) + await self.generate(1) await self.ledger.on_header.where(lambda e: e.height == 212) claim_id = still_valid.outputs[0].claim_id c1 = (await self.resolve(f'still-valid#{claim_id}'))['claim_id'] @@ -70,7 +70,7 @@ class BlockchainReorganizationTests(CommandTestCase): self.assertTrue(c1 == c2 == c3) abandon_tx = await self.daemon.jsonrpc_stream_abandon(claim_id=claim_id) - await self.blockchain.generate(1) + await self.generate(1) await self.ledger.on_header.where(lambda e: e.height == 213) c1 = await self.resolve(f'still-valid#{still_valid.outputs[0].claim_id}') c2 = await self.daemon.jsonrpc_resolve([f'still-valid#{claim_id[:2]}']) @@ -113,7 +113,7 @@ class BlockchainReorganizationTests(CommandTestCase): # reorg the last block dropping our claim tx await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.clear_mempool() - await self.blockchain.generate(2) + await self.generate(2) # wait for the client to catch up and verify the reorg await asyncio.wait_for(self.on_header(209), 3.0) @@ -142,7 +142,7 @@ class BlockchainReorganizationTests(CommandTestCase): # broadcast the claim in a different block new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode()) self.assertEqual(broadcast_tx.id, new_txid) - await self.blockchain.generate(1) + await self.generate(1) # wait for the client to catch up await asyncio.wait_for(self.on_header(210), 1.0) @@ -192,7 +192,7 @@ class BlockchainReorganizationTests(CommandTestCase): # reorg the last block dropping our claim tx await self.blockchain.invalidate_block(invalidated_block_hash) await self.blockchain.clear_mempool() - await self.blockchain.generate(2) + await self.generate(2) # wait for the client to catch up and verify the reorg await asyncio.wait_for(self.on_header(209), 3.0) @@ -221,7 +221,7 @@ class BlockchainReorganizationTests(CommandTestCase): # broadcast the claim in a different block new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode()) self.assertEqual(broadcast_tx.id, new_txid) - await self.blockchain.generate(1) + await self.generate(1) # wait for the client to catch up await asyncio.wait_for(self.on_header(210), 1.0) diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index e5cc725cc..cbb61eec9 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -16,7 +16,7 @@ class NetworkTests(IntegrationTestCase): async def test_remote_height_updated_automagically(self): initial_height = self.ledger.network.remote_height - await self.blockchain.generate(1) + await self.generate(1) await self.ledger.network.on_header.first self.assertEqual(self.ledger.network.remote_height, initial_height + 1) @@ -85,8 +85,8 @@ class ReconnectTests(IntegrationTestCase): async def test_direct_sync(self): await self.ledger.stop() initial_height = self.ledger.local_height_including_downloaded_height - await self.blockchain.generate(100) - while self.conductor.spv_node.server.session_mgr.notified_height < initial_height + 99: # off by 1 + await self.generate(100) + while self.conductor.spv_node.server.session_manager.notified_height < initial_height + 99: # off by 1 await asyncio.sleep(0.1) self.assertEqual(initial_height, self.ledger.local_height_including_downloaded_height) await self.ledger.headers.open() @@ -105,7 +105,7 @@ class ReconnectTests(IntegrationTestCase): # await self.ledger.resolve([], 'derp') # self.assertTrue(self.ledger.network.is_connected) await asyncio.wait_for(self.on_transaction_id(sendtxid), 10.0) # mempool - await self.blockchain.generate(1) + await self.generate(1) await self.on_transaction_id(sendtxid) # confirmed self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine @@ -123,7 +123,7 @@ class ReconnectTests(IntegrationTestCase): await self.ledger.network.get_transaction(sendtxid) # * goes to pick some water outside... * time passes by and another donation comes in sendtxid = await self.blockchain.send_to_address(address1, 42) - await self.blockchain.generate(1) + await self.generate(1) # (this is just so the test doesn't hang forever if it doesn't reconnect) if not self.ledger.network.is_connected: await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=10.0) @@ -169,7 +169,7 @@ class UDPServerFailDiscoveryTest(AsyncioTestCase): self.addCleanup(conductor.stop_blockchain) await conductor.start_spv() self.addCleanup(conductor.stop_spv) - self.assertFalse(conductor.spv_node.server.bp.status_server.is_running) + self.assertFalse(conductor.spv_node.server.reader.status_server.is_running) await asyncio.wait_for(conductor.start_wallet(), timeout=5) self.addCleanup(conductor.stop_wallet) self.assertTrue(conductor.wallet_node.ledger.network.is_connected) diff --git a/tests/integration/blockchain/test_sync.py b/tests/integration/blockchain/test_sync.py index 7af2bd1aa..9fc1f2ed2 100644 --- a/tests/integration/blockchain/test_sync.py +++ b/tests/integration/blockchain/test_sync.py @@ -63,7 +63,7 @@ class SyncTests(IntegrationTestCase): await self.assertBalance(account1, '1.0') await self.assertBalance(account2, '1.0') - await self.blockchain.generate(1) + await self.generate(1) # pay 0.01 from main node to receiving node, would have increased change addresses address0 = (await account0.receiving.get_addresses())[0] @@ -79,7 +79,7 @@ class SyncTests(IntegrationTestCase): account1.ledger.wait(tx), account2.ledger.wait(tx), ]) - await self.blockchain.generate(1) + await self.generate(1) await asyncio.wait([ account0.ledger.wait(tx), account1.ledger.wait(tx), @@ -92,7 +92,7 @@ class SyncTests(IntegrationTestCase): await self.assertBalance(account1, '0.989876') await self.assertBalance(account2, '0.989876') - await self.blockchain.generate(1) + await self.generate(1) # create a new mirror node and see if it syncs to same balance from scratch node3 = await self.make_wallet_node(account1.seed) diff --git a/tests/integration/blockchain/test_wallet_commands.py b/tests/integration/blockchain/test_wallet_commands.py index 81c98865f..2cc25d0ba 100644 --- a/tests/integration/blockchain/test_wallet_commands.py +++ b/tests/integration/blockchain/test_wallet_commands.py @@ -11,7 +11,7 @@ from lbry.wallet.dewies import dict_values_to_lbc class WalletCommands(CommandTestCase): async def test_wallet_create_and_add_subscribe(self): - session = next(iter(self.conductor.spv_node.server.session_mgr.sessions.values())) + session = next(iter(self.conductor.spv_node.server.session_manager.sessions.values())) self.assertEqual(len(session.hashX_subs), 27) wallet = await self.daemon.jsonrpc_wallet_create('foo', create_account=True, single_key=True) self.assertEqual(len(session.hashX_subs), 28) diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 139a0bf0b..4aff4634e 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -25,13 +25,13 @@ class TestSessions(IntegrationTestCase): ) await session.create_connection() await session.send_request('server.banner', ()) - self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) + self.assertEqual(len(self.conductor.spv_node.server.session_manager.sessions), 1) self.assertFalse(session.is_closing()) await asyncio.sleep(1.1) with self.assertRaises(asyncio.TimeoutError): await session.send_request('server.banner', ()) self.assertTrue(session.is_closing()) - self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0) + self.assertEqual(len(self.conductor.spv_node.server.session_manager.sessions), 0) async def test_proper_version(self): info = await self.ledger.network.get_server_features() @@ -186,7 +186,7 @@ class TestHubDiscovery(CommandTestCase): self.daemon.ledger.network.client.server_address_and_port, ('127.0.0.1', kp_final_node.port) ) - kp_final_node.server.session_mgr._notify_peer('127.0.0.1:9988') + kp_final_node.server.session_manager._notify_peer('127.0.0.1:9988') await self.daemon.ledger.network.on_hub.first await asyncio.sleep(0.5) # wait for above event to be processed by other listeners self.assertEqual( diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index 7856270ed..563adb7e7 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -23,7 +23,7 @@ class BaseResolveTestCase(CommandTestCase): def assertMatchESClaim(self, claim_from_es, claim_from_db): self.assertEqual(claim_from_es['claim_hash'][::-1].hex(), claim_from_db.claim_hash.hex()) self.assertEqual(claim_from_es['claim_id'], claim_from_db.claim_hash.hex()) - self.assertEqual(claim_from_es['activation_height'], claim_from_db.activation_height) + self.assertEqual(claim_from_es['activation_height'], claim_from_db.activation_height, f"es height: {claim_from_es['activation_height']}, rocksdb height: {claim_from_db.activation_height}") self.assertEqual(claim_from_es['last_take_over_height'], claim_from_db.last_takeover_height) self.assertEqual(claim_from_es['tx_id'], claim_from_db.tx_hash[::-1].hex()) self.assertEqual(claim_from_es['tx_nout'], claim_from_db.position) @@ -44,44 +44,44 @@ class BaseResolveTestCase(CommandTestCase): if claim_id is None: self.assertIn('error', other) self.assertEqual(other['error']['name'], 'NOT_FOUND') - claims_from_es = (await self.conductor.spv_node.server.bp.db.search_index.search(name=name))[0] + claims_from_es = (await self.conductor.spv_node.server.session_manager.search_index.search(name=name))[0] claims_from_es = [c['claim_hash'][::-1].hex() for c in claims_from_es] self.assertNotIn(claim_id, claims_from_es) else: - claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id) + claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(claim_id=claim_id) self.assertEqual(claim_id, other['claim_id']) self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex()) async def assertNoClaimForName(self, name: str): lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) - stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) + stream, channel, _, _ = await self.conductor.spv_node.server.db.resolve(name) self.assertNotIn('claimId', lbrycrd_winning) if stream is not None: self.assertIsInstance(stream, LookupError) else: self.assertIsInstance(channel, LookupError) - claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name) + claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(name=name) self.assertListEqual([], claim_from_es[0]) async def assertNoClaim(self, claim_id: str): self.assertDictEqual( {}, json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) ) - claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id) + claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search(claim_id=claim_id) self.assertListEqual([], claim_from_es[0]) - claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) + claim = await self.conductor.spv_node.server.db.fs_getclaimbyid(claim_id) self.assertIsNone(claim) async def assertMatchWinningClaim(self, name): expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) - stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) + stream, channel, _, _ = await self.conductor.spv_node.server.db.resolve(name) claim = stream if stream else channel await self._assertMatchClaim(expected, claim) return claim async def _assertMatchClaim(self, expected, claim): self.assertMatchDBClaim(expected, claim) - claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( + claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search( claim_id=claim.claim_hash.hex() ) self.assertEqual(len(claim_from_es[0]), 1) @@ -90,7 +90,7 @@ class BaseResolveTestCase(CommandTestCase): async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True): expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) - claim = await self.conductor.spv_node.server.bp.db.fs_getclaimbyid(claim_id) + claim = await self.conductor.spv_node.server.db.fs_getclaimbyid(claim_id) if is_active_in_lbrycrd: if not expected: self.assertIsNone(claim) @@ -98,7 +98,7 @@ class BaseResolveTestCase(CommandTestCase): self.assertMatchDBClaim(expected, claim) else: self.assertDictEqual({}, expected) - claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( + claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search( claim_id=claim.claim_hash.hex() ) self.assertEqual(len(claim_from_es[0]), 1) @@ -116,7 +116,7 @@ class BaseResolveTestCase(CommandTestCase): def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount, is_active_in_lbrycrd=True): total_amount = 0 - db = self.conductor.spv_node.server.bp.db + db = self.conductor.spv_node.server.db for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))): total_amount += amount @@ -131,7 +131,7 @@ class BaseResolveTestCase(CommandTestCase): async def assertMatchClaimsForName(self, name): expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name)) - db = self.conductor.spv_node.server.bp.db + db = self.conductor.spv_node.server.db # self.assertEqual(len(expected['claims']), len(db_claims.claims)) # self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight) last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight'] @@ -143,7 +143,7 @@ class BaseResolveTestCase(CommandTestCase): claim = db._fs_get_claim_by_hash(claim_hash) self.assertMatchDBClaim(c, claim) - claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( + claim_from_es = await self.conductor.spv_node.server.session_manager.search_index.search( claim_id=c['claimId'] ) self.assertEqual(len(claim_from_es[0]), 1) @@ -151,6 +151,17 @@ class BaseResolveTestCase(CommandTestCase): self.assertMatchESClaim(claim_from_es[0][0], claim) self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount']) + async def assertNameState(self, height: int, name: str, winning_claim_id: str, last_takeover_height: int, + non_winning_claims: List[ClaimStateValue]): + self.assertEqual(height, self.conductor.spv_node.server.db.db_height) + await self.assertMatchClaimIsWinning(name, winning_claim_id) + for non_winning in non_winning_claims: + claim = await self.assertMatchClaim( + non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd + ) + self.assertEqual(non_winning.activation_height, claim.activation_height) + self.assertEqual(last_takeover_height, claim.last_takeover_height) + class ResolveCommand(BaseResolveTestCase): async def test_colliding_short_id(self): @@ -641,17 +652,6 @@ class ResolveClaimTakeovers(BaseResolveTestCase): async def create_stream_claim(self, amount: str, name='derp') -> str: return (await self.stream_create(name, amount, allow_duplicate_name=True))['outputs'][0]['claim_id'] - async def assertNameState(self, height: int, name: str, winning_claim_id: str, last_takeover_height: int, - non_winning_claims: List[ClaimStateValue]): - self.assertEqual(height, self.conductor.spv_node.server.bp.db.db_height) - await self.assertMatchClaimIsWinning(name, winning_claim_id) - for non_winning in non_winning_claims: - claim = await self.assertMatchClaim( - non_winning.claim_id, is_active_in_lbrycrd=non_winning.active_in_lbrycrd - ) - self.assertEqual(non_winning.activation_height, claim.activation_height) - self.assertEqual(last_takeover_height, claim.last_takeover_height) - async def test_delay_takeover_with_update(self): name = 'derp' first_claim_id = await self.create_stream_claim('0.2', name) @@ -961,7 +961,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): ) greater_than_or_equal_to_zero = [ claim['claim_id'] for claim in ( - await self.conductor.spv_node.server.bp.db.search_index.search( + await self.conductor.spv_node.server.session_manager.search_index.search( channel_id=channel_id, fee_amount=">=0" ))[0] ] @@ -969,7 +969,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): self.assertSetEqual(set(greater_than_or_equal_to_zero), {stream_with_no_fee, stream_with_fee}) greater_than_zero = [ claim['claim_id'] for claim in ( - await self.conductor.spv_node.server.bp.db.search_index.search( + await self.conductor.spv_node.server.session_manager.search_index.search( channel_id=channel_id, fee_amount=">0" ))[0] ] @@ -977,7 +977,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): self.assertSetEqual(set(greater_than_zero), {stream_with_fee}) equal_to_zero = [ claim['claim_id'] for claim in ( - await self.conductor.spv_node.server.bp.db.search_index.search( + await self.conductor.spv_node.server.session_manager.search_index.search( channel_id=channel_id, fee_amount="<=0" ))[0] ] @@ -995,7 +995,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): await self.blockchain.send_to_address(address, 400.0) await self.account.ledger.on_address.first await self.generate(100) - self.assertEqual(800, self.conductor.spv_node.server.bp.db.db_height) + self.assertEqual(800, self.conductor.spv_node.server.db.db_height) # Block 801: Claim A for 10 LBC is accepted. # It is the first claim, so it immediately becomes active and controlling. @@ -1007,10 +1007,10 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # Its activation height is 1121 + min(4032, floor((1121-801) / 32)) = 1121 + 10 = 1131. # State: A(10) is controlling, B(20) is accepted. await self.generate(32 * 10 - 1) - self.assertEqual(1120, self.conductor.spv_node.server.bp.db.db_height) + self.assertEqual(1120, self.conductor.spv_node.server.db.db_height) claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_B, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_B}") - self.assertEqual(1121, self.conductor.spv_node.server.bp.db.db_height) + claim_B, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_B}") + self.assertEqual(1121, self.conductor.spv_node.server.db.db_height) self.assertEqual(1131, claim_B.activation_height) await self.assertMatchClaimIsWinning(name, claim_id_A) @@ -1018,33 +1018,33 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # Since it is a support for the controlling claim, it activates immediately. # State: A(10+14) is controlling, B(20) is accepted. await self.support_create(claim_id_A, bid='14.0') - self.assertEqual(1122, self.conductor.spv_node.server.bp.db.db_height) + self.assertEqual(1122, self.conductor.spv_node.server.db.db_height) await self.assertMatchClaimIsWinning(name, claim_id_A) # Block 1123: Claim C for 50 LBC is accepted. # The activation height is 1123 + min(4032, floor((1123-801) / 32)) = 1123 + 10 = 1133. # State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted. claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - self.assertEqual(1123, self.conductor.spv_node.server.bp.db.db_height) - claim_C, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_C}") + self.assertEqual(1123, self.conductor.spv_node.server.db.db_height) + claim_C, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_C}") self.assertEqual(1133, claim_C.activation_height) await self.assertMatchClaimIsWinning(name, claim_id_A) await self.generate(7) - self.assertEqual(1130, self.conductor.spv_node.server.bp.db.db_height) + self.assertEqual(1130, self.conductor.spv_node.server.db.db_height) await self.assertMatchClaimIsWinning(name, claim_id_A) await self.generate(1) # Block 1131: Claim B activates. It has 20 LBC, while claim A has 24 LBC (10 original + 14 from support X). There is no takeover, and claim A remains controlling. # State: A(10+14) is controlling, B(20) is active, C(50) is accepted. - self.assertEqual(1131, self.conductor.spv_node.server.bp.db.db_height) + self.assertEqual(1131, self.conductor.spv_node.server.db.db_height) await self.assertMatchClaimIsWinning(name, claim_id_A) # Block 1132: Claim D for 300 LBC is accepted. The activation height is 1132 + min(4032, floor((1132-801) / 32)) = 1132 + 10 = 1142. # State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted. claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - self.assertEqual(1132, self.conductor.spv_node.server.bp.db.db_height) - claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}") + self.assertEqual(1132, self.conductor.spv_node.server.db.db_height) + claim_D, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_D}") self.assertEqual(False, claim_D.is_controlling) self.assertEqual(801, claim_D.last_takeover_height) self.assertEqual(1142, claim_D.activation_height) @@ -1053,8 +1053,8 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # Block 1133: Claim C activates. It has 50 LBC, while claim A has 24 LBC, so a takeover is initiated. The takeover height for this name is set to 1133, and therefore the activation delay for all the claims becomes min(4032, floor((1133-1133) / 32)) = 0. All the claims become active. The totals for each claim are recalculated, and claim D becomes controlling because it has the highest total. # State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling await self.generate(1) - self.assertEqual(1133, self.conductor.spv_node.server.bp.db.db_height) - claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}") + self.assertEqual(1133, self.conductor.spv_node.server.db.db_height) + claim_D, _, _, _ = await self.conductor.spv_node.server.db.resolve(f"{name}:{claim_id_D}") self.assertEqual(True, claim_D.is_controlling) self.assertEqual(1133, claim_D.last_takeover_height) self.assertEqual(1133, claim_D.activation_height) @@ -1407,12 +1407,12 @@ class ResolveClaimTakeovers(BaseResolveTestCase): second_claim_id = (await self.stream_create(name, '0.01', allow_duplicate_name=True))['outputs'][0]['claim_id'] await self.assertNoClaim(second_claim_id) self.assertEqual( - len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 1 + len((await self.conductor.spv_node.server.session_manager.search_index.search(claim_name=name))[0]), 1 ) await self.generate(1) await self.assertMatchClaim(second_claim_id) self.assertEqual( - len((await self.conductor.spv_node.server.bp.db.search_index.search(claim_name=name))[0]), 2 + len((await self.conductor.spv_node.server.session_manager.search_index.search(claim_name=name))[0]), 2 ) async def test_abandon_controlling_same_block_as_new_claim(self): @@ -1428,7 +1428,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): async def test_trending(self): async def get_trending_score(claim_id): - return (await self.conductor.spv_node.server.bp.db.search_index.search( + return (await self.conductor.spv_node.server.session_manager.search_index.search( claim_id=claim_id ))[0][0]['trending_score'] @@ -1456,7 +1456,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): ) await self.generate(1) self.assertEqual(-174.951347102643, await get_trending_score(claim_id1)) - search_results = (await self.conductor.spv_node.server.bp.db.search_index.search(claim_name="derp"))[0] + search_results = (await self.conductor.spv_node.server.session_manager.search_index.search(claim_name="derp"))[0] self.assertEqual(1, len(search_results)) self.assertListEqual([claim_id1], [c['claim_id'] for c in search_results]) @@ -1465,22 +1465,31 @@ class ResolveAfterReorg(BaseResolveTestCase): async def reorg(self, start): blocks = self.ledger.headers.height - start self.blockchain.block_expected = start - 1 + + + prepare = self.ledger.on_header.where(self.blockchain.is_expected_block) + self.conductor.spv_node.server.synchronized.clear() + # go back to start - await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode()) + print('invalidate block', await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode())) # go to previous + 1 - await self.generate(blocks + 2) + await self.blockchain.generate(blocks + 2) + + await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate + await self.conductor.spv_node.server.synchronized.wait() + # await asyncio.wait_for(self.on_header(self.blockchain.block_expected), 30.0) async def assertBlockHash(self, height): - bp = self.conductor.spv_node.server.bp + reader_db = self.conductor.spv_node.server.db block_hash = await self.blockchain.get_block_hash(height) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) - self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) + self.assertEqual(block_hash, (await reader_db.fs_block_hashes(height, 1))[0][::-1].hex()) txids = [ - tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height) + tx_hash[::-1].hex() for tx_hash in reader_db.get_block_txs(height) ] - txs = await bp.db.get_transactions_and_merkles(txids) - block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] + txs = await reader_db.get_transactions_and_merkles(txids) + block_txs = (await self.conductor.spv_node.server.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') @@ -1491,9 +1500,18 @@ class ResolveAfterReorg(BaseResolveTestCase): channel_id = self.get_claim_id( await self.channel_create(channel_name, '0.01') ) - self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex()) + + await self.assertNameState( + height=207, name='@abc', winning_claim_id=channel_id, last_takeover_height=207, + non_winning_claims=[] + ) + await self.reorg(206) - self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex()) + + await self.assertNameState( + height=208, name='@abc', winning_claim_id=channel_id, last_takeover_height=207, + non_winning_claims=[] + ) # await self.assertNoClaimForName(channel_name) # self.assertNotIn('error', await self.resolve(channel_name)) @@ -1502,16 +1520,29 @@ class ResolveAfterReorg(BaseResolveTestCase): stream_id = self.get_claim_id( await self.stream_create(stream_name, '0.01', channel_id=channel_id) ) - self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex()) + + await self.assertNameState( + height=209, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209, + non_winning_claims=[] + ) await self.reorg(206) - self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex()) + await self.assertNameState( + height=210, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209, + non_winning_claims=[] + ) await self.support_create(stream_id, '0.01') - self.assertNotIn('error', await self.resolve(stream_name)) - self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex()) + + await self.assertNameState( + height=211, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209, + non_winning_claims=[] + ) await self.reorg(206) # self.assertNotIn('error', await self.resolve(stream_name)) - self.assertEqual(stream_id, (await self.assertMatchWinningClaim(stream_name)).claim_hash.hex()) + await self.assertNameState( + height=212, name=stream_name, winning_claim_id=stream_id, last_takeover_height=209, + non_winning_claims=[] + ) await self.stream_abandon(stream_id) self.assertNotIn('error', await self.resolve(channel_name)) @@ -1553,7 +1584,6 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.ledger.wait(broadcast_tx) await self.support_create(still_valid.outputs[0].claim_id, '0.01') - # await self.generate(1) await self.ledger.wait(broadcast_tx, self.blockchain.block_expected) self.assertEqual(self.ledger.headers.height, 208) await self.assertBlockHash(208) @@ -1574,7 +1604,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.blockchain.generate(2) # wait for the client to catch up and verify the reorg - await asyncio.wait_for(self.on_header(209), 3.0) + await asyncio.wait_for(self.on_header(209), 30.0) await self.assertBlockHash(207) await self.assertBlockHash(208) await self.assertBlockHash(209) @@ -1603,7 +1633,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.blockchain.generate(1) # wait for the client to catch up - await asyncio.wait_for(self.on_header(210), 1.0) + await asyncio.wait_for(self.on_header(210), 30.0) # verify the claim is in the new block and that it is returned by claim_search republished = await self.resolve('hovercraft') @@ -1653,7 +1683,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.blockchain.generate(2) # wait for the client to catch up and verify the reorg - await asyncio.wait_for(self.on_header(209), 3.0) + await asyncio.wait_for(self.on_header(209), 30.0) await self.assertBlockHash(207) await self.assertBlockHash(208) await self.assertBlockHash(209) @@ -1682,7 +1712,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.blockchain.generate(1) # wait for the client to catch up - await asyncio.wait_for(self.on_header(210), 1.0) + await asyncio.wait_for(self.on_header(210), 10.0) # verify the claim is in the new block and that it is returned by claim_search republished = await self.resolve('hovercraft') diff --git a/tests/integration/transactions/test_internal_transaction_api.py b/tests/integration/transactions/test_internal_transaction_api.py index 142009ba4..e17cffa7b 100644 --- a/tests/integration/transactions/test_internal_transaction_api.py +++ b/tests/integration/transactions/test_internal_transaction_api.py @@ -23,7 +23,7 @@ class BasicTransactionTest(IntegrationTestCase): )) sendtxid1 = await self.blockchain.send_to_address(address1, 5) sendtxid2 = await self.blockchain.send_to_address(address2, 5) - await self.blockchain.generate(1) + await self.generate(1) await notifications self.assertEqual(d2l(await self.account.get_balance()), '10.0') @@ -57,7 +57,7 @@ class BasicTransactionTest(IntegrationTestCase): notifications = asyncio.create_task(asyncio.wait( [asyncio.ensure_future(self.ledger.wait(channel_tx)), asyncio.ensure_future(self.ledger.wait(stream_tx))] )) - await self.blockchain.generate(1) + await self.generate(1) await notifications self.assertEqual(d2l(await self.account.get_balance()), '7.985786') self.assertEqual(d2l(await self.account.get_balance(include_claims=True)), '9.985786') @@ -70,7 +70,7 @@ class BasicTransactionTest(IntegrationTestCase): await self.broadcast(abandon_tx) await notify notify = asyncio.create_task(self.ledger.wait(abandon_tx)) - await self.blockchain.generate(1) + await self.generate(1) await notify response = await self.ledger.resolve([], ['lbry://@bar/foo']) diff --git a/tests/integration/transactions/test_transactions.py b/tests/integration/transactions/test_transactions.py index fea0b18fb..595ff86b0 100644 --- a/tests/integration/transactions/test_transactions.py +++ b/tests/integration/transactions/test_transactions.py @@ -9,9 +9,8 @@ from lbry.wallet.manager import WalletManager class BasicTransactionTests(IntegrationTestCase): - async def test_variety_of_transactions_and_longish_history(self): - await self.blockchain.generate(300) + await self.generate(300) await self.assertBalance(self.account, '0.0') addresses = await self.account.receiving.get_addresses() @@ -57,7 +56,7 @@ class BasicTransactionTests(IntegrationTestCase): for tx in await self.ledger.db.get_transactions(txid__in=[tx.id for tx in txs]) ])) - await self.blockchain.generate(1) + await self.generate(1) await asyncio.wait([self.ledger.wait(tx) for tx in txs]) await self.assertBalance(self.account, '199.99876') @@ -74,7 +73,7 @@ class BasicTransactionTests(IntegrationTestCase): ) await self.broadcast(tx) await self.ledger.wait(tx) - await self.blockchain.generate(1) + await self.generate(1) await self.ledger.wait(tx) self.assertEqual(2, await self.account.get_utxo_count()) # 199 + change @@ -92,7 +91,7 @@ class BasicTransactionTests(IntegrationTestCase): self.blockchain.send_to_address(address, 1.1) for address in addresses[:5] )) await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # mempool - await self.blockchain.generate(1) + await self.generate(1) await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) # confirmed await self.assertBalance(account1, '5.5') await self.assertBalance(account2, '0.0') @@ -107,7 +106,7 @@ class BasicTransactionTests(IntegrationTestCase): ) await self.broadcast(tx) await self.ledger.wait(tx) # mempool - await self.blockchain.generate(1) + await self.generate(1) await self.ledger.wait(tx) # confirmed await self.assertBalance(account1, '3.499802') @@ -121,7 +120,7 @@ class BasicTransactionTests(IntegrationTestCase): ) await self.broadcast(tx) await self.ledger.wait(tx) # mempool - await self.blockchain.generate(1) + await self.generate(1) await self.ledger.wait(tx) # confirmed tx = (await account1.get_transactions(include_is_my_input=True, include_is_my_output=True))[1] @@ -133,11 +132,11 @@ class BasicTransactionTests(IntegrationTestCase): self.assertTrue(tx.outputs[1].is_internal_transfer) async def test_history_edge_cases(self): - await self.blockchain.generate(300) + await self.generate(300) await self.assertBalance(self.account, '0.0') address = await self.account.receiving.get_or_create_usable_address() # evil trick: mempool is unsorted on real life, but same order between python instances. reproduce it - original_summary = self.conductor.spv_node.server.bp.mempool.transaction_summaries + original_summary = self.conductor.spv_node.server.mempool.transaction_summaries def random_summary(*args, **kwargs): summary = original_summary(*args, **kwargs) @@ -146,7 +145,7 @@ class BasicTransactionTests(IntegrationTestCase): while summary == ordered: random.shuffle(summary) return summary - self.conductor.spv_node.server.bp.mempool.transaction_summaries = random_summary + self.conductor.spv_node.server.mempool.transaction_summaries = random_summary # 10 unconfirmed txs, all from blockchain wallet sends = [self.blockchain.send_to_address(address, 10) for _ in range(10)] # use batching to reduce issues with send_to_address on cli @@ -199,7 +198,7 @@ class BasicTransactionTests(IntegrationTestCase): async def test_sqlite_coin_chooser(self): wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger}) - await self.blockchain.generate(300) + await self.generate(300) await self.assertBalance(self.account, '0.0') address = await self.account.receiving.get_or_create_usable_address() diff --git a/tests/unit/wallet/server/test_revertable.py b/tests/unit/wallet/server/test_revertable.py index 79b4cdb0c..b3fe03a57 100644 --- a/tests/unit/wallet/server/test_revertable.py +++ b/tests/unit/wallet/server/test_revertable.py @@ -126,28 +126,28 @@ class TestRevertablePrefixDB(unittest.TestCase): self.assertIsNone(self.db.claim_takeover.get(name)) self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height) - self.db.commit(10000000) + self.db.commit(10000000, b'\x00' * 32) self.assertEqual(10000000, self.db.claim_takeover.get(name).height) self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height)) self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1)) self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1)) - self.db.commit(10000001) + self.db.commit(10000001, b'\x01' * 32) self.assertIsNone(self.db.claim_takeover.get(name)) self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2)) - self.db.commit(10000002) + self.db.commit(10000002, b'\x02' * 32) self.assertEqual(10000002, self.db.claim_takeover.get(name).height) self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2)) self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3)) - self.db.commit(10000003) + self.db.commit(10000003, b'\x03' * 32) self.assertEqual(10000003, self.db.claim_takeover.get(name).height) - self.db.rollback(10000003) + self.db.rollback(10000003, b'\x03' * 32) self.assertEqual(10000002, self.db.claim_takeover.get(name).height) - self.db.rollback(10000002) + self.db.rollback(10000002, b'\x02' * 32) self.assertIsNone(self.db.claim_takeover.get(name)) - self.db.rollback(10000001) + self.db.rollback(10000001, b'\x01' * 32) self.assertEqual(10000000, self.db.claim_takeover.get(name).height) - self.db.rollback(10000000) + self.db.rollback(10000000, b'\x00' * 32) self.assertIsNone(self.db.claim_takeover.get(name))