This commit is contained in:
Jack Robison 2020-11-17 10:57:27 -05:00
parent 13b473403e
commit 730a67c8d6
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 24 additions and 23 deletions

View file

@ -75,7 +75,7 @@ class ReconnectTests(IntegrationTestCase):
session.trigger_urgent_reconnect.set() session.trigger_urgent_reconnect.set()
await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1) await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1)
self.assertEqual(2, len(list(self.ledger.network.session_pool.available_sessions))) self.assertEqual(2, len(list(self.ledger.network.session_pool.available_sessions)))
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected())
switch_event = self.ledger.network.on_connected.first switch_event = self.ledger.network.on_connected.first
await node2.stop(True) await node2.stop(True)
# secondary down, but primary is ok, do not switch! (switches trigger new on_connected events) # secondary down, but primary is ok, do not switch! (switches trigger new on_connected events)
@ -99,7 +99,7 @@ class ReconnectTests(IntegrationTestCase):
address1 = await self.account.receiving.get_or_create_usable_address() address1 = await self.account.receiving.get_or_create_usable_address()
# disconnect and send a new tx, should reconnect and get it # disconnect and send a new tx, should reconnect and get it
self.ledger.network.client.connection_lost(Exception()) self.ledger.network.client.connection_lost(Exception())
self.assertFalse(self.ledger.network.is_connected) self.assertFalse(self.ledger.network.is_connected())
sendtxid = await self.blockchain.send_to_address(address1, 1.1337) sendtxid = await self.blockchain.send_to_address(address1, 1.1337)
await asyncio.wait_for(self.on_transaction_id(sendtxid), 1.0) # mempool await asyncio.wait_for(self.on_transaction_id(sendtxid), 1.0) # mempool
await self.blockchain.generate(1) await self.blockchain.generate(1)
@ -122,7 +122,7 @@ class ReconnectTests(IntegrationTestCase):
sendtxid = await self.blockchain.send_to_address(address1, 42) sendtxid = await self.blockchain.send_to_address(address1, 42)
await self.blockchain.generate(1) await self.blockchain.generate(1)
# (this is just so the test doesn't hang forever if it doesn't reconnect) # (this is just so the test doesn't hang forever if it doesn't reconnect)
if not self.ledger.network.is_connected: if not self.ledger.network.is_connected():
await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=1.0) await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=1.0)
# omg, the burned cable still works! torba is fire proof! # omg, the burned cable still works! torba is fire proof!
await self.ledger.network.get_transaction(sendtxid) await self.ledger.network.get_transaction(sendtxid)
@ -130,11 +130,11 @@ class ReconnectTests(IntegrationTestCase):
async def test_timeout_then_reconnect(self): async def test_timeout_then_reconnect(self):
# tests that it connects back after some failed attempts # tests that it connects back after some failed attempts
await self.conductor.spv_node.stop() await self.conductor.spv_node.stop()
self.assertFalse(self.ledger.network.is_connected) self.assertFalse(self.ledger.network.is_connected())
await asyncio.sleep(0.2) # let it retry and fail once await asyncio.sleep(0.2) # let it retry and fail once
await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.conductor.spv_node.start(self.conductor.blockchain_node)
await self.ledger.network.on_connected.first await self.ledger.network.on_connected.first
self.assertTrue(self.ledger.network.is_connected) self.assertTrue(self.ledger.network.is_connected())
async def test_online_but_still_unavailable(self): async def test_online_but_still_unavailable(self):
# Edge case. See issue #2445 for context # Edge case. See issue #2445 for context
@ -184,7 +184,7 @@ class ServerPickingTestCase(AsyncioTestCase):
self.addCleanup(network.stop) self.addCleanup(network.stop)
asyncio.ensure_future(network.start()) asyncio.ensure_future(network.start())
await asyncio.wait_for(network.on_connected.first, timeout=1) await asyncio.wait_for(network.on_connected.first, timeout=1)
self.assertTrue(network.is_connected) self.assertTrue(network.is_connected())
self.assertTupleEqual(network.client.server, ('127.0.0.1', 1337)) self.assertTupleEqual(network.client.server, ('127.0.0.1', 1337))
self.assertTrue(all([not session.is_closing() for session in network.session_pool.available_sessions])) self.assertTrue(all([not session.is_closing() for session in network.session_pool.available_sessions]))
# ensure we are connected to all of them after a while # ensure we are connected to all of them after a while

View file

@ -152,8 +152,11 @@ class BasicTransactionTests(IntegrationTestCase):
for batch in range(0, len(sends), 10): for batch in range(0, len(sends), 10):
txids = await asyncio.gather(*sends[batch:batch + 10]) txids = await asyncio.gather(*sends[batch:batch + 10])
await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) await asyncio.wait([self.on_transaction_id(txid) for txid in txids])
remote_status = await self.ledger.network.subscribe_address(address)
self.assertTrue(await self.ledger.update_history(address, remote_status)) client = self.ledger.network.get_wallet_session(self.account.wallet)
remote_status = await self.ledger.network.subscribe_address(client, address)
self.assertTrue(await self.ledger.update_history(client, address, remote_status))
# 20 unconfirmed txs, 10 from blockchain, 10 from local to local # 20 unconfirmed txs, 10 from blockchain, 10 from local to local
utxos = await self.account.get_utxos() utxos = await self.account.get_utxos()
txs = [] txs = []
@ -166,12 +169,12 @@ class BasicTransactionTests(IntegrationTestCase):
await self.broadcast(tx) await self.broadcast(tx)
txs.append(tx) txs.append(tx)
await asyncio.wait([self.on_transaction_address(tx, address) for tx in txs], timeout=1) await asyncio.wait([self.on_transaction_address(tx, address) for tx in txs], timeout=1)
remote_status = await self.ledger.network.subscribe_address(address) remote_status = await self.ledger.network.subscribe_address(client, address)
self.assertTrue(await self.ledger.update_history(address, remote_status)) self.assertTrue(await self.ledger.update_history(client, address, remote_status))
# server history grows unordered # server history grows unordered
txid = await self.blockchain.send_to_address(address, 1) txid = await self.blockchain.send_to_address(address, 1)
await self.on_transaction_id(txid) await self.on_transaction_id(txid)
self.assertTrue(await self.ledger.update_history(address, remote_status)) self.assertTrue(await self.ledger.update_history(client, address, remote_status))
self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1]))
self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync))

View file

@ -8,16 +8,14 @@ from lbry.wallet.dewies import dict_values_to_lbc
class WalletCommands(CommandTestCase): class WalletCommands(CommandTestCase):
async def test_wallet_create_and_add_subscribe(self): async def test_wallet_create_and_add_subscribe(self):
session = next(iter(self.conductor.spv_node.server.session_mgr.sessions)) self.assertSetEqual({0, 27}, {len(session.hashX_subs) for session in self.conductor.spv_node.server.session_mgr.sessions})
self.assertEqual(len(session.hashX_subs), 27)
wallet = await self.daemon.jsonrpc_wallet_create('foo', create_account=True, single_key=True) wallet = await self.daemon.jsonrpc_wallet_create('foo', create_account=True, single_key=True)
self.assertEqual(len(session.hashX_subs), 28) self.assertSetEqual({0, 27, 1}, {len(session.hashX_subs) for session in self.conductor.spv_node.server.session_mgr.sessions})
await self.daemon.jsonrpc_wallet_remove(wallet.id) await self.daemon.jsonrpc_wallet_remove(wallet.id)
self.assertEqual(len(session.hashX_subs), 27) self.assertSetEqual({0, 27}, {len(session.hashX_subs) for session in self.conductor.spv_node.server.session_mgr.sessions})
await self.daemon.jsonrpc_wallet_add(wallet.id) await self.daemon.jsonrpc_wallet_add(wallet.id)
self.assertEqual(len(session.hashX_subs), 28) self.assertSetEqual({0, 27, 1}, {len(session.hashX_subs) for session in self.conductor.spv_node.server.session_mgr.sessions})
async def test_wallet_syncing_status(self): async def test_wallet_syncing_status(self):
address = await self.daemon.jsonrpc_address_unused() address = await self.daemon.jsonrpc_address_unused()

View file

@ -16,12 +16,12 @@ class MockNetwork:
self.address = None self.address = None
self.get_history_called = [] self.get_history_called = []
self.get_transaction_called = [] self.get_transaction_called = []
self.is_connected = False self.is_connected = lambda _: False
def retriable_call(self, function, *args, **kwargs): def retriable_call(self, function, *args, **kwargs):
return function(*args, **kwargs) return function(*args, **kwargs)
async def get_history(self, address): async def get_history(self, address, session=None):
self.get_history_called.append(address) self.get_history_called.append(address)
self.address = address self.address = address
return self.history return self.history
@ -40,7 +40,7 @@ class MockNetwork:
merkle = await self.get_merkle(tx_hash, known_height) merkle = await self.get_merkle(tx_hash, known_height)
return tx, merkle return tx, merkle
async def get_transaction_batch(self, txids): async def get_transaction_batch(self, txids, session=None):
return { return {
txid: await self.get_transaction_and_merkle(txid) txid: await self.get_transaction_and_merkle(txid)
for txid in txids for txid in txids
@ -111,7 +111,7 @@ class TestSynchronization(LedgerTestCase):
txid2: hexlify(get_transaction(get_output(2)).raw), txid2: hexlify(get_transaction(get_output(2)).raw),
txid3: hexlify(get_transaction(get_output(3)).raw), txid3: hexlify(get_transaction(get_output(3)).raw),
}) })
await self.ledger.update_history(address, '') await self.ledger.update_history(None, address, '')
self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_history_called, [address])
self.assertListEqual(self.ledger.network.get_transaction_called, [txid1, txid2, txid3]) self.assertListEqual(self.ledger.network.get_transaction_called, [txid1, txid2, txid3])
@ -129,7 +129,7 @@ class TestSynchronization(LedgerTestCase):
self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified) self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified)
self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified) self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified)
self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified) self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified)
await self.ledger.update_history(address, '') await self.ledger.update_history(None, address, '')
self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_history_called, [address])
self.assertListEqual(self.ledger.network.get_transaction_called, []) self.assertListEqual(self.ledger.network.get_transaction_called, [])
@ -137,7 +137,7 @@ class TestSynchronization(LedgerTestCase):
self.ledger.network.transaction[txid4] = hexlify(get_transaction(get_output(4)).raw) self.ledger.network.transaction[txid4] = hexlify(get_transaction(get_output(4)).raw)
self.ledger.network.get_history_called = [] self.ledger.network.get_history_called = []
self.ledger.network.get_transaction_called = [] self.ledger.network.get_transaction_called = []
await self.ledger.update_history(address, '') await self.ledger.update_history(None, address, '')
self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_history_called, [address])
self.assertListEqual(self.ledger.network.get_transaction_called, [txid4]) self.assertListEqual(self.ledger.network.get_transaction_called, [txid4])
address_details = await self.ledger.db.get_address(address=address) address_details = await self.ledger.db.get_address(address=address)