From a32a2ef04eccdff6038a2b9581abdfd6a439c9f9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 4 Jun 2020 10:18:04 -0400 Subject: [PATCH 1/7] add sqlite coin chooser --- lbry/wallet/coinselection.py | 2 +- lbry/wallet/database.py | 105 ++++++++++++++++++++++++++++++++++- lbry/wallet/ledger.py | 11 +++- 3 files changed, 113 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/coinselection.py b/lbry/wallet/coinselection.py index 63bbb6977..182c6d0d4 100644 --- a/lbry/wallet/coinselection.py +++ b/lbry/wallet/coinselection.py @@ -5,7 +5,7 @@ from lbry.wallet.transaction import OutputEffectiveAmountEstimator MAXIMUM_TRIES = 100000 -STRATEGIES = [] +STRATEGIES = ['sqlite'] # sqlite coin chooser is in database.py def strategy(method): diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 15c866017..f1672d0f1 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -4,6 +4,7 @@ import asyncio import sqlite3 import platform from binascii import hexlify +from collections import defaultdict from dataclasses import dataclass from contextvars import ContextVar from concurrent.futures.thread import ThreadPoolExecutor @@ -14,7 +15,7 @@ from prometheus_client import Gauge, Counter, Histogram from lbry.utils import LockWithMetrics from .bip32 import PubKey -from .transaction import Transaction, Output, OutputScript, TXRefImmutable +from .transaction import Transaction, Output, OutputScript, TXRefImmutable, Input from .constants import TXO_TYPES, CLAIM_TYPES from .util import date_to_julian_day @@ -466,6 +467,95 @@ def dict_row_factory(cursor, row): return d +SQLITE_MAX_INTEGER = 9223372036854775807 + + +def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decoded_transactions: Dict[str, Transaction], + result: Dict[Tuple[bytes, int, bool], List[int]], reserved: List[Transaction], + amount_to_reserve: int, reserved_amount: int, floor: int, ceiling: int, + fee_per_byte: int) -> int: + accounts_fmt = ",".join(["?"] * len(accounts)) + txo_query = f""" + SELECT tx.txid, txo.txoid, tx.raw, tx.height, txo.position as nout, tx.is_verified, txo.amount FROM txo + INNER JOIN account_address USING (address) + LEFT JOIN txi USING (txoid) + INNER JOIN tx USING (txid) + WHERE txo.txo_type=0 AND txi.txoid IS NULL AND tx.txid IS NOT NULL AND NOT txo.is_reserved + AND txo.amount >= ? AND txo.amount < ? + """ + if accounts: + txo_query += f""" + AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'} + """ + # prefer confirmed, but save unconfirmed utxos from this selection in case they are needed + unconfirmed = [] + for row in transaction.execute(txo_query, (floor, ceiling, *accounts)): + (txid, txoid, raw, height, nout, verified, amount) = row.values() + # verified or non verified transactions were found- reset the gap count + # multiple txos can come from the same tx, only decode it once and cache + if txid not in decoded_transactions: + # cache the decoded transaction + decoded_transactions[txid] = Transaction(raw) + decoded_tx = decoded_transactions[txid] + # save the unconfirmed txo for possible use later, if still needed + if verified: + # add the txo to the reservation, minus the fee for including it + reserved_amount += amount + reserved_amount -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte + # mark it as reserved + result[(raw, height, verified)].append(nout) + reserved.append(txoid) + # if we've reserved enough, return + if reserved_amount >= amount_to_reserve: + return reserved_amount + else: + unconfirmed.append((txid, txoid, raw, height, nout, verified, amount)) + # we're popping the items, so to get them in the order they were seen they are reversed + unconfirmed.reverse() + # add available unconfirmed txos if any were previously found + while unconfirmed and reserved_amount < amount_to_reserve: + (txid, txoid, raw, height, nout, verified, amount) = unconfirmed.pop() + # it's already decoded + decoded_tx = decoded_transactions[txid] + # add to the reserved amount + reserved_amount += amount + reserved_amount -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte + result[(raw, height, verified)].append(nout) + reserved.append(txoid) + return reserved_amount + + +def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: List, amount_to_reserve: int, floor: int, + fee_per_byte: int, set_reserved: bool): + txs = defaultdict(list) + decoded_transactions = {} + reserved = [] + + reserved_dewies = 0 + multiplier = 10 + gap_count = 0 + + while reserved_dewies < amount_to_reserve and gap_count < 5 and floor * multiplier < SQLITE_MAX_INTEGER: + previous_reserved_dewies = reserved_dewies + reserved_dewies = _get_spendable_utxos( + transaction, accounts, decoded_transactions, txs, reserved, amount_to_reserve, reserved_dewies, + floor, floor * multiplier, fee_per_byte + ) + floor *= multiplier + if previous_reserved_dewies == reserved_dewies: + gap_count += 1 + multiplier **= 2 + else: + gap_count = 0 + multiplier = 10 + + # reserve the accumulated txos if enough were found + if reserved_dewies >= amount_to_reserve and set_reserved: + transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?", + [(True, txoid) for txoid in reserved]).fetchall() + return txs + + class Database(SQLiteMixin): SCHEMA_VERSION = "1.3" @@ -666,6 +756,19 @@ class Database(SQLiteMixin): # 2. update address histories removing deleted TXs return True + async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000, + fee_per_byte: int = 50, set_reserved: bool = True) -> List: + to_spend = await self.db.run( + get_and_reserve_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount, + fee_per_byte, set_reserved + ) + txos = [] + for (raw, height, verified), positions in to_spend.items(): + tx = Transaction(raw, height=height, is_verified=verified) + for nout in positions: + txos.append(tx.outputs[nout].get_estimator(ledger)) + return txos + async def select_transactions(self, cols, accounts=None, read_only=False, **constraints): if not {'txid', 'txid__in'}.intersection(constraints): assert accounts, "'accounts' argument required when no 'txid' constraint is present" diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 50adf7467..194f1baa7 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -244,11 +244,16 @@ class Ledger(metaclass=LedgerRegistry): def get_address_count(self, **constraints): return self.db.get_address_count(**constraints) - async def get_spendable_utxos(self, amount: int, funding_accounts): + async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], + min_amount=100000): + min_amount = min(amount // 10, min_amount) + fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self) + selector = CoinSelector(amount, fee) async with self._utxo_reservation_lock: + if self.coin_selection_strategy == 'sqlite': + return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount, + fee_per_byte=self.fee_per_byte) txos = await self.get_effective_amount_estimators(funding_accounts) - fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self) - selector = CoinSelector(amount, fee) spendables = selector.select(txos, self.coin_selection_strategy) if spendables: await self.reserve_outputs(s.txo for s in spendables) From 9dc6092cb012f07a456ca3c6091e60e224abdeac Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 4 Jun 2020 10:18:14 -0400 Subject: [PATCH 2/7] update tests --- .../unit/blob_exchange/test_transfer_blob.py | 7 +- tests/unit/stream/test_stream_manager.py | 7 +- tests/unit/wallet/test_transaction.py | 76 ++++++++++++++++++- 3 files changed, 82 insertions(+), 8 deletions(-) diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index b6339f375..8968116af 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -28,9 +28,10 @@ def mock_config(): class BlobExchangeTestBase(AsyncioTestCase): async def asyncSetUp(self): self.loop = asyncio.get_event_loop() - + self.client_wallet_dir = tempfile.mkdtemp() self.client_dir = tempfile.mkdtemp() self.server_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.client_wallet_dir) self.addCleanup(shutil.rmtree, self.client_dir) self.addCleanup(shutil.rmtree, self.server_dir) self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir, @@ -39,8 +40,8 @@ class BlobExchangeTestBase(AsyncioTestCase): self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config) self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP') - self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir, - fixed_peers=[]) + self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, + wallet=self.client_wallet_dir, fixed_peers=[]) self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite")) self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config) self.client_peer_manager = PeerManager(self.loop) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 8c17f61e4..fa307d65f 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -65,7 +65,7 @@ def get_claim_transaction(claim_name, claim=b''): ) -async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): +async def get_mock_wallet(sd_hash, storage, wallet_dir, balance=10.0, fee=None): claim = Claim() if fee: if fee['currency'] == 'LBC': @@ -97,7 +97,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): wallet = Wallet() ledger = Ledger({ - 'db': Database(':memory:'), + 'db': Database(os.path.join(wallet_dir, 'blockchain.db')), 'headers': FakeHeaders(514082) }) await ledger.db.open() @@ -136,7 +136,8 @@ class TestStreamManager(BlobExchangeTestBase): self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort ) self.sd_hash = descriptor.sd_hash - self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) + self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, self.client_wallet_dir, + balance, fee) analytics_manager = AnalyticsManager( self.client_config, binascii.hexlify(generate_id()).decode(), diff --git a/tests/unit/wallet/test_transaction.py b/tests/unit/wallet/test_transaction.py index 7c0942cd0..5468b85b6 100644 --- a/tests/unit/wallet/test_transaction.py +++ b/tests/unit/wallet/test_transaction.py @@ -1,4 +1,7 @@ +import os import unittest +import tempfile +import shutil from binascii import hexlify, unhexlify from itertools import cycle @@ -302,9 +305,11 @@ class TestTransactionSigning(AsyncioTestCase): class TransactionIOBalancing(AsyncioTestCase): async def asyncSetUp(self): + wallet_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, wallet_dir) self.ledger = Ledger({ - 'db': Database(':memory:'), - 'headers': Headers(':memory:') + 'db': Database(os.path.join(wallet_dir, 'blockchain.db')), + 'headers': Headers(':memory:'), }) await self.ledger.db.open() self.account = Account.from_dict( @@ -419,3 +424,70 @@ class TransactionIOBalancing(AsyncioTestCase): self.assertListEqual([0.01, 1], self.inputs(tx)) # change is now needed to consume extra input self.assertListEqual([0.97], self.outputs(tx)) + + async def test_basic_use_cases_sqlite(self): + self.ledger.coin_selection_strategy = 'sqlite' + self.ledger.fee_per_byte = int(0.01*CENT) + + # available UTXOs for filling missing inputs + utxos = await self.create_utxos([ + 1, 1, 3, 5, 10 + ]) + + self.assertEqual(5, len(await self.ledger.get_utxos())) + + # pay 3 coins (3.07 w/ fees) + tx = await self.tx( + [], # inputs + [self.txo(3)] # outputs + ) + + await self.ledger.db.db.run(self.ledger.db._transaction_io, tx, tx.outputs[0].get_address(self.ledger), tx.id) + + self.assertListEqual(self.inputs(tx), [1.0, 1.0, 3.0]) + # a change of 1.95 is added to reach balance + self.assertListEqual(self.outputs(tx), [3, 1.95]) + # utxos: 1.95, 3, 5, 10 + self.assertEqual(2, len(await self.ledger.get_utxos())) + # pay 4.946 coins (5.00 w/ fees) + tx = await self.tx( + [], # inputs + [self.txo(4.946)] # outputs + ) + self.assertEqual(1, len(await self.ledger.get_utxos())) + + self.assertListEqual(self.inputs(tx), [5.0]) + self.assertEqual(2, len(tx.outputs)) + self.assertEqual(494600000, tx.outputs[0].amount) + + # utxos: 3, 1.95, 4.946, 10 + await self.ledger.release_outputs(utxos) + + # supplied input and output, but input is not enough to cover output + tx = await self.tx( + [self.txi(self.txo(10))], # inputs + [self.txo(11)] # outputs + ) + # additional input is chosen (UTXO 1) + self.assertListEqual([10, 1.0, 1.0], self.inputs(tx)) + # change is now needed to consume extra input + self.assertListEqual([11, 0.95], self.outputs(tx)) + await self.ledger.release_outputs(utxos) + # liquidating a UTXO + tx = await self.tx( + [self.txi(self.txo(10))], # inputs + [] # outputs + ) + self.assertListEqual([10], self.inputs(tx)) + # missing change added to consume the amount + self.assertListEqual([9.98], self.outputs(tx)) + await self.ledger.release_outputs(utxos) + # liquidating at a loss, requires adding extra inputs + tx = await self.tx( + [self.txi(self.txo(0.01))], # inputs + [] # outputs + ) + # UTXO 1 is added to cover some of the fee + self.assertListEqual([0.01, 1], self.inputs(tx)) + # change is now needed to consume extra input + self.assertListEqual([0.97], self.outputs(tx)) From 5de944146a95a95ba1b5a4dca43c2712088069f4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 4 Jun 2020 10:19:24 -0400 Subject: [PATCH 3/7] logging --- lbry/wallet/ledger.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 194f1baa7..beb5e3b1d 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_transaction_controller = StreamController() self.on_transaction = self._on_transaction_controller.stream self.on_transaction.listen( - lambda e: log.info( + lambda e: log.debug( '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s', self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id ) @@ -568,13 +568,26 @@ class Ledger(metaclass=LedgerRegistry): await self.get_local_status_and_history(address, synced_history.getvalue()) if local_status != remote_status: if local_history == remote_history: + log.warning( + "%s has a synced history but a mismatched status", address + ) return True + remote_set = set(remote_history) + local_set = set(local_history) log.warning( - "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", - remote_status, len(remote_history), local_status, len(local_history) + "%s is out of sync after syncing.\n" + "Remote: %s with %d items (%i unique), local: %s with %d items (%i unique).\n" + "Histories are mismatched on %i items.\n" + "Local is missing\n" + "%s\n" + "Remote is missing\n" + "%s\n" + "******", + address, remote_status, len(remote_history), len(remote_set), + local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)), + "\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]), + "\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]) ) - log.warning("local: %s", local_history) - log.warning("remote: %s", remote_history) self._known_addresses_out_of_sync.add(address) return False else: From 17f76c9cb38386eacd72ee3d01c3c87e5d965e99 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 4 Jun 2020 10:19:54 -0400 Subject: [PATCH 4/7] leave inputs for rejected tx reserved --- lbry/wallet/manager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 22f0e5d21..11d90f11d 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -302,6 +302,9 @@ class WalletManager: await self.ledger.broadcast(tx) if blocking: await self.ledger.wait(tx, timeout=None) + except CodeMessageError as err: + log.warning("transaction rejected, leaving reserved") + raise except: await self.ledger.release_tx(tx) raise From ae53062518b94835a69b694227d2007bc6ed70e1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 5 Jun 2020 20:05:13 -0400 Subject: [PATCH 5/7] integration test for sqlite coin chooser --- lbry/wallet/database.py | 21 ++- .../blockchain/test_transactions.py | 126 ++++++++++++++++++ 2 files changed, 140 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index f1672d0f1..bc6d880f8 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -487,6 +487,9 @@ def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decode txo_query += f""" AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'} """ + txo_query += """ + ORDER BY txo.amount ASC, tx.height DESC + """ # prefer confirmed, but save unconfirmed utxos from this selection in case they are needed unconfirmed = [] for row in transaction.execute(txo_query, (floor, ceiling, *accounts)): @@ -526,7 +529,7 @@ def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decode def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: List, amount_to_reserve: int, floor: int, - fee_per_byte: int, set_reserved: bool): + fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool): txs = defaultdict(list) decoded_transactions = {} reserved = [] @@ -550,10 +553,13 @@ def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: L multiplier = 10 # reserve the accumulated txos if enough were found - if reserved_dewies >= amount_to_reserve and set_reserved: - transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?", - [(True, txoid) for txoid in reserved]).fetchall() - return txs + if reserved_dewies >= amount_to_reserve: + if set_reserved: + transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?", + [(True, txoid) for txoid in reserved]).fetchall() + return txs + # return_insufficient_funds and set_reserved are used for testing + return txs if return_insufficient_funds else {} class Database(SQLiteMixin): @@ -757,10 +763,11 @@ class Database(SQLiteMixin): return True async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000, - fee_per_byte: int = 50, set_reserved: bool = True) -> List: + fee_per_byte: int = 50, set_reserved: bool = True, + return_insufficient_funds: bool = False) -> List: to_spend = await self.db.run( get_and_reserve_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount, - fee_per_byte, set_reserved + fee_per_byte, set_reserved, return_insufficient_funds ) txos = [] for (raw, height, verified), positions in to_spend.items(): diff --git a/tests/integration/blockchain/test_transactions.py b/tests/integration/blockchain/test_transactions.py index 6a0ace201..5e5ae5346 100644 --- a/tests/integration/blockchain/test_transactions.py +++ b/tests/integration/blockchain/test_transactions.py @@ -5,6 +5,7 @@ from itertools import chain from lbry.wallet.transaction import Transaction, Output, Input from lbry.testcase import IntegrationTestCase from lbry.wallet.util import satoshis_to_coins, coins_to_satoshis +from lbry.wallet.manager import WalletManager class BasicTransactionTests(IntegrationTestCase): @@ -173,3 +174,128 @@ class BasicTransactionTests(IntegrationTestCase): self.assertTrue(await self.ledger.update_history(address, remote_status)) self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) + + def wait_for_txid(self, txid, address): + return self.ledger.on_transaction.where( + lambda e: e.tx.id == txid and e.address == address + ) + + async def _test_transaction(self, send_amount, address, inputs, change): + tx = await Transaction.create( + [], [Output.pay_pubkey_hash(send_amount, self.ledger.address_to_hash160(address))], [self.account], + self.account + ) + await self.ledger.broadcast(tx) + input_amounts = [txi.amount for txi in tx.inputs] + self.assertListEqual(inputs, input_amounts) + self.assertEqual(len(inputs), len(tx.inputs)) + self.assertEqual(2, len(tx.outputs)) + self.assertEqual(send_amount, tx.outputs[0].amount) + self.assertEqual(change, tx.outputs[1].amount) + return tx + + async def assertSpendable(self, amounts): + spendable = await self.ledger.db.get_spendable_utxos( + self.ledger, 2000000000000, [self.account], set_reserved=False, return_insufficient_funds=True + ) + got_amounts = [estimator.effective_amount for estimator in spendable] + self.assertListEqual(amounts, got_amounts) + + async def test_sqlite_coin_chooser(self): + wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger}) + await self.blockchain.generate(300) + await self.assertBalance(self.account, '0.0') + address = await self.account.receiving.get_or_create_usable_address() + other_account = self.wallet.generate_account(self.ledger) + other_address = await other_account.receiving.get_or_create_usable_address() + self.ledger.coin_selection_strategy = 'sqlite' + await self.ledger.subscribe_account(self.account) + + txids = [] + txids.append(await self.blockchain.send_to_address(address, 1.0)) + txids.append(await self.blockchain.send_to_address(address, 1.0)) + txids.append(await self.blockchain.send_to_address(address, 3.0)) + txids.append(await self.blockchain.send_to_address(address, 5.0)) + txids.append(await self.blockchain.send_to_address(address, 10.0)) + + await asyncio.wait([self.wait_for_txid(txid, address) for txid in txids], timeout=1) + await self.assertBalance(self.account, '20.0') + await self.assertSpendable([99992600, 99992600, 299992600, 499992600, 999992600]) + + # send 1.5 lbc + + first_tx = await Transaction.create( + [], [Output.pay_pubkey_hash(150000000, self.ledger.address_to_hash160(other_address))], [self.account], + self.account + ) + + self.assertEqual(2, len(first_tx.inputs)) + self.assertEqual(2, len(first_tx.outputs)) + self.assertEqual(100000000, first_tx.inputs[0].amount) + self.assertEqual(100000000, first_tx.inputs[1].amount) + self.assertEqual(150000000, first_tx.outputs[0].amount) + self.assertEqual(49980200, first_tx.outputs[1].amount) + + await self.assertBalance(self.account, '18.0') + await self.assertSpendable([299992600, 499992600, 999992600]) + + await wallet_manager.broadcast_or_release(first_tx, blocking=True) + await self.assertSpendable([49972800, 299992600, 499992600, 999992600]) + # 0.499, 3.0, 5.0, 10.0 + await self.assertBalance(self.account, '18.499802') + + # send 1.5lbc again + + second_tx = await self._test_transaction(150000000, other_address, [49980200, 300000000], 199960400) + await self.assertSpendable([499992600, 999992600]) + + # replicate cancelling the api call after the tx broadcast while ledger.wait'ing it + e = asyncio.Event() + + real_broadcast = self.ledger.broadcast + + async def broadcast(tx): + try: + return await real_broadcast(tx) + finally: + e.set() + + self.ledger.broadcast = broadcast + + broadcast_task = asyncio.create_task(wallet_manager.broadcast_or_release(second_tx, blocking=True)) + # wait for the broadcast to finish + await e.wait() + # cancel the api call + broadcast_task.cancel() + with self.assertRaises(asyncio.CancelledError): + await broadcast_task + + # test if sending another 1.5 lbc will try to double spend the inputs from the cancelled tx + tx1 = await self._test_transaction(150000000, other_address, [500000000], 349987600) + await self.assertSpendable([199953000, 999992600]) + await self.ledger.broadcast(tx1) + + # spend deep into the mempool and see what else breaks + tx2 = await self._test_transaction(150000000, other_address, [199960400], 49948000) + await self.assertSpendable([349980200, 999992600]) + await self.ledger.broadcast(tx2) + + tx3 = await self._test_transaction(150000000, other_address, [349987600], 199975200) + await self.assertSpendable([49940600, 999992600]) + await self.ledger.broadcast(tx3) + + tx4 = await self._test_transaction(150000000, other_address, [49948000, 1000000000], 899928200) + await self.assertSpendable([199967800]) + await self.ledger.broadcast(tx4) + await self.ledger.wait(tx4, timeout=1) + + await self.assertBalance(self.account, '10.999034') + # spend more + tx5 = await self._test_transaction(100000000, other_address, [199975200], 99962800) + await self.assertSpendable([899920800]) + await self.ledger.broadcast(tx5) + await self.assertSpendable([899920800]) + await self.ledger.wait(tx5, timeout=1) + + await self.assertSpendable([99955400, 899920800]) + await self.assertBalance(self.account, '9.99891') From 872b89ee9328fed736e0251aed5e946ef7294251 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 5 Jun 2020 20:06:08 -0400 Subject: [PATCH 6/7] fix mempool conflicts following cancelled api calls that send transactions with the `blocking` flag --- lbry/wallet/manager.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 11d90f11d..bff251a06 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -300,11 +300,8 @@ class WalletManager: async def broadcast_or_release(self, tx, blocking=False): try: await self.ledger.broadcast(tx) - if blocking: - await self.ledger.wait(tx, timeout=None) - except CodeMessageError as err: - log.warning("transaction rejected, leaving reserved") - raise except: await self.ledger.release_tx(tx) raise + if blocking: + await self.ledger.wait(tx, timeout=None) From bc9f33c2e0c0edd9078be26476cf2d2b9ec6fe6c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 8 Jun 2020 18:06:58 -0400 Subject: [PATCH 7/7] fix test --- .../blockchain/test_transactions.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/tests/integration/blockchain/test_transactions.py b/tests/integration/blockchain/test_transactions.py index 5e5ae5346..b09926034 100644 --- a/tests/integration/blockchain/test_transactions.py +++ b/tests/integration/blockchain/test_transactions.py @@ -272,30 +272,32 @@ class BasicTransactionTests(IntegrationTestCase): # test if sending another 1.5 lbc will try to double spend the inputs from the cancelled tx tx1 = await self._test_transaction(150000000, other_address, [500000000], 349987600) - await self.assertSpendable([199953000, 999992600]) - await self.ledger.broadcast(tx1) + await self.ledger.wait(tx1, timeout=1) + # wait for the cancelled transaction too, so that it's in the database + # needed to keep everything deterministic + await self.ledger.wait(second_tx, timeout=1) + await self.assertSpendable([199953000, 349980200, 999992600]) # spend deep into the mempool and see what else breaks tx2 = await self._test_transaction(150000000, other_address, [199960400], 49948000) await self.assertSpendable([349980200, 999992600]) - await self.ledger.broadcast(tx2) + await self.ledger.wait(tx2, timeout=1) + await self.assertSpendable([49940600, 349980200, 999992600]) - tx3 = await self._test_transaction(150000000, other_address, [349987600], 199975200) - await self.assertSpendable([49940600, 999992600]) - await self.ledger.broadcast(tx3) + tx3 = await self._test_transaction(150000000, other_address, [49948000, 349987600], 249915800) + await self.assertSpendable([999992600]) + await self.ledger.wait(tx3, timeout=1) + await self.assertSpendable([249908400, 999992600]) - tx4 = await self._test_transaction(150000000, other_address, [49948000, 1000000000], 899928200) - await self.assertSpendable([199967800]) - await self.ledger.broadcast(tx4) + tx4 = await self._test_transaction(150000000, other_address, [249915800], 99903400) + await self.assertSpendable([999992600]) await self.ledger.wait(tx4, timeout=1) - await self.assertBalance(self.account, '10.999034') - # spend more - tx5 = await self._test_transaction(100000000, other_address, [199975200], 99962800) - await self.assertSpendable([899920800]) - await self.ledger.broadcast(tx5) - await self.assertSpendable([899920800]) - await self.ledger.wait(tx5, timeout=1) + await self.assertSpendable([99896000, 999992600]) - await self.assertSpendable([99955400, 899920800]) - await self.assertBalance(self.account, '9.99891') + # spend more + tx5 = await self._test_transaction(100000000, other_address, [99903400, 1000000000], 999883600) + await self.assertSpendable([]) + await self.ledger.wait(tx5, timeout=1) + await self.assertSpendable([999876200]) + await self.assertBalance(self.account, '9.998836')