From 925eb618de9bb88decd433fdd964d6d5f1f5c5e7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 30 Aug 2019 00:52:08 -0300 Subject: [PATCH] failing test for unordered mempool --- .../integration/test_transactions.py | 39 +++++++++++++++++++ torba/torba/client/baseledger.py | 8 +++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/torba/tests/client_tests/integration/test_transactions.py b/torba/tests/client_tests/integration/test_transactions.py index bdbd51dfa..4fb67f11b 100644 --- a/torba/tests/client_tests/integration/test_transactions.py +++ b/torba/tests/client_tests/integration/test_transactions.py @@ -1,6 +1,9 @@ import logging import asyncio +import random from itertools import chain +from random import shuffle + from torba.testcase import IntegrationTestCase from torba.client.util import satoshis_to_coins, coins_to_satoshis @@ -129,3 +132,39 @@ class BasicTransactionTests(IntegrationTestCase): self.assertEqual(tx.outputs[0].get_address(self.ledger), address2) self.assertEqual(tx.outputs[0].is_change, False) self.assertEqual(tx.outputs[1].is_change, True) + + async def test_history_edge_cases(self): + await self.assertBalance(self.account, '0.0') + address = await self.account.receiving.get_or_create_usable_address() + # evil trick: mempool is unsorted on real life, but same order between python instances. reproduce it + original_summary = self.conductor.spv_node.server.mempool.transaction_summaries + async def random_summary(*args, **kwargs): + summary = await original_summary(*args, **kwargs) + if summary and len(summary) > 2: + ordered = summary.copy() + while summary == ordered: + random.shuffle(summary) + return summary + self.conductor.spv_node.server.mempool.transaction_summaries = random_summary + # 10 unconfirmed txs, all from blockchain wallet + sends = list(self.blockchain.send_to_address(address, 10) for _ in range(10)) + # use batching to reduce issues with send_to_address on cli + for batch in range(0, len(sends), 10): + txids = await asyncio.gather(*sends[batch:batch + 10]) + await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) + remote_status = await self.ledger.network.subscribe_address(address) + self.assertTrue(await self.ledger.update_history(address, remote_status)) + # 20 unconfirmed txs, 10 from blockchain, 10 from local to local + utxos = await self.account.get_utxos() + txs = [] + for utxo in utxos: + tx = await self.ledger.transaction_class.create( + [self.ledger.transaction_class.input_class.spend(utxo)], + [], + [self.account], self.account + ) + await self.broadcast(tx) + txs.append(tx) + await asyncio.wait([self.on_transaction_address(tx, address) for tx in txs], timeout=1) + remote_status = await self.ledger.network.subscribe_address(address) + self.assertTrue(await self.ledger.update_history(address, remote_status)) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index d127243d9..704654669 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -415,7 +415,7 @@ class BaseLedger(metaclass=LedgerRegistry): local_status, local_history = await self.get_local_status_and_history(address) if local_status == remote_status: - return + return True remote_history = await self.network.retriable_call(self.network.get_history, address) @@ -472,14 +472,18 @@ class BaseLedger(metaclass=LedgerRegistry): local_status, local_history = await self.get_local_status_and_history(address) if local_status != remote_status: + remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) + if remote_history == local_history: + return True log.debug( "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", remote_status, len(remote_history), local_status, len(local_history) ) log.debug("local: %s", local_history) log.debug("remote: %s", remote_history) + return False else: - log.debug("Sync completed for: %s", address) + return True async def cache_transaction(self, txid, remote_height): cache_item = self._tx_cache.get(txid)