Merge pull request #2959 from lbryio/sqlite-coin-chooser

Add all sqlite coin chooser
This commit is contained in:
Lex Berezhny 2020-06-08 18:46:42 -04:00 committed by GitHub
commit e70bdd86a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 350 additions and 20 deletions

View file

@ -5,7 +5,7 @@ from lbry.wallet.transaction import OutputEffectiveAmountEstimator
MAXIMUM_TRIES = 100000
STRATEGIES = []
STRATEGIES = ['sqlite'] # sqlite coin chooser is in database.py
def strategy(method):

View file

@ -4,6 +4,7 @@ import asyncio
import sqlite3
import platform
from binascii import hexlify
from collections import defaultdict
from dataclasses import dataclass
from contextvars import ContextVar
from concurrent.futures.thread import ThreadPoolExecutor
@ -14,7 +15,7 @@ from prometheus_client import Gauge, Counter, Histogram
from lbry.utils import LockWithMetrics
from .bip32 import PubKey
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
from .transaction import Transaction, Output, OutputScript, TXRefImmutable, Input
from .constants import TXO_TYPES, CLAIM_TYPES
from .util import date_to_julian_day
@ -466,6 +467,101 @@ def dict_row_factory(cursor, row):
return d
SQLITE_MAX_INTEGER = 9223372036854775807
def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decoded_transactions: Dict[str, Transaction],
result: Dict[Tuple[bytes, int, bool], List[int]], reserved: List[Transaction],
amount_to_reserve: int, reserved_amount: int, floor: int, ceiling: int,
fee_per_byte: int) -> int:
accounts_fmt = ",".join(["?"] * len(accounts))
txo_query = f"""
SELECT tx.txid, txo.txoid, tx.raw, tx.height, txo.position as nout, tx.is_verified, txo.amount FROM txo
INNER JOIN account_address USING (address)
LEFT JOIN txi USING (txoid)
INNER JOIN tx USING (txid)
WHERE txo.txo_type=0 AND txi.txoid IS NULL AND tx.txid IS NOT NULL AND NOT txo.is_reserved
AND txo.amount >= ? AND txo.amount < ?
"""
if accounts:
txo_query += f"""
AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'}
"""
txo_query += """
ORDER BY txo.amount ASC, tx.height DESC
"""
# prefer confirmed, but save unconfirmed utxos from this selection in case they are needed
unconfirmed = []
for row in transaction.execute(txo_query, (floor, ceiling, *accounts)):
(txid, txoid, raw, height, nout, verified, amount) = row.values()
# verified or non verified transactions were found- reset the gap count
# multiple txos can come from the same tx, only decode it once and cache
if txid not in decoded_transactions:
# cache the decoded transaction
decoded_transactions[txid] = Transaction(raw)
decoded_tx = decoded_transactions[txid]
# save the unconfirmed txo for possible use later, if still needed
if verified:
# add the txo to the reservation, minus the fee for including it
reserved_amount += amount
reserved_amount -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
# mark it as reserved
result[(raw, height, verified)].append(nout)
reserved.append(txoid)
# if we've reserved enough, return
if reserved_amount >= amount_to_reserve:
return reserved_amount
else:
unconfirmed.append((txid, txoid, raw, height, nout, verified, amount))
# we're popping the items, so to get them in the order they were seen they are reversed
unconfirmed.reverse()
# add available unconfirmed txos if any were previously found
while unconfirmed and reserved_amount < amount_to_reserve:
(txid, txoid, raw, height, nout, verified, amount) = unconfirmed.pop()
# it's already decoded
decoded_tx = decoded_transactions[txid]
# add to the reserved amount
reserved_amount += amount
reserved_amount -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
result[(raw, height, verified)].append(nout)
reserved.append(txoid)
return reserved_amount
def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: List, amount_to_reserve: int, floor: int,
fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool):
txs = defaultdict(list)
decoded_transactions = {}
reserved = []
reserved_dewies = 0
multiplier = 10
gap_count = 0
while reserved_dewies < amount_to_reserve and gap_count < 5 and floor * multiplier < SQLITE_MAX_INTEGER:
previous_reserved_dewies = reserved_dewies
reserved_dewies = _get_spendable_utxos(
transaction, accounts, decoded_transactions, txs, reserved, amount_to_reserve, reserved_dewies,
floor, floor * multiplier, fee_per_byte
)
floor *= multiplier
if previous_reserved_dewies == reserved_dewies:
gap_count += 1
multiplier **= 2
else:
gap_count = 0
multiplier = 10
# reserve the accumulated txos if enough were found
if reserved_dewies >= amount_to_reserve:
if set_reserved:
transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?",
[(True, txoid) for txoid in reserved]).fetchall()
return txs
# return_insufficient_funds and set_reserved are used for testing
return txs if return_insufficient_funds else {}
class Database(SQLiteMixin):
SCHEMA_VERSION = "1.3"
@ -666,6 +762,20 @@ class Database(SQLiteMixin):
# 2. update address histories removing deleted TXs
return True
async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000,
fee_per_byte: int = 50, set_reserved: bool = True,
return_insufficient_funds: bool = False) -> List:
to_spend = await self.db.run(
get_and_reserve_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount,
fee_per_byte, set_reserved, return_insufficient_funds
)
txos = []
for (raw, height, verified), positions in to_spend.items():
tx = Transaction(raw, height=height, is_verified=verified)
for nout in positions:
txos.append(tx.outputs[nout].get_estimator(ledger))
return txos
async def select_transactions(self, cols, accounts=None, read_only=False, **constraints):
if not {'txid', 'txid__in'}.intersection(constraints):
assert accounts, "'accounts' argument required when no 'txid' constraint is present"

View file

@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry):
self._on_transaction_controller = StreamController()
self.on_transaction = self._on_transaction_controller.stream
self.on_transaction.listen(
lambda e: log.info(
lambda e: log.debug(
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
)
@ -244,11 +244,16 @@ class Ledger(metaclass=LedgerRegistry):
def get_address_count(self, **constraints):
return self.db.get_address_count(**constraints)
async def get_spendable_utxos(self, amount: int, funding_accounts):
async with self._utxo_reservation_lock:
txos = await self.get_effective_amount_estimators(funding_accounts)
async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']],
min_amount=100000):
min_amount = min(amount // 10, min_amount)
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
selector = CoinSelector(amount, fee)
async with self._utxo_reservation_lock:
if self.coin_selection_strategy == 'sqlite':
return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount,
fee_per_byte=self.fee_per_byte)
txos = await self.get_effective_amount_estimators(funding_accounts)
spendables = selector.select(txos, self.coin_selection_strategy)
if spendables:
await self.reserve_outputs(s.txo for s in spendables)
@ -563,13 +568,26 @@ class Ledger(metaclass=LedgerRegistry):
await self.get_local_status_and_history(address, synced_history.getvalue())
if local_status != remote_status:
if local_history == remote_history:
return True
log.warning(
"Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items",
remote_status, len(remote_history), local_status, len(local_history)
"%s has a synced history but a mismatched status", address
)
return True
remote_set = set(remote_history)
local_set = set(local_history)
log.warning(
"%s is out of sync after syncing.\n"
"Remote: %s with %d items (%i unique), local: %s with %d items (%i unique).\n"
"Histories are mismatched on %i items.\n"
"Local is missing\n"
"%s\n"
"Remote is missing\n"
"%s\n"
"******",
address, remote_status, len(remote_history), len(remote_set),
local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
"\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]),
"\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)])
)
log.warning("local: %s", local_history)
log.warning("remote: %s", remote_history)
self._known_addresses_out_of_sync.add(address)
return False
else:

View file

@ -300,8 +300,8 @@ class WalletManager:
async def broadcast_or_release(self, tx, blocking=False):
try:
await self.ledger.broadcast(tx)
if blocking:
await self.ledger.wait(tx, timeout=None)
except:
await self.ledger.release_tx(tx)
raise
if blocking:
await self.ledger.wait(tx, timeout=None)

View file

@ -5,6 +5,7 @@ from itertools import chain
from lbry.wallet.transaction import Transaction, Output, Input
from lbry.testcase import IntegrationTestCase
from lbry.wallet.util import satoshis_to_coins, coins_to_satoshis
from lbry.wallet.manager import WalletManager
class BasicTransactionTests(IntegrationTestCase):
@ -173,3 +174,130 @@ class BasicTransactionTests(IntegrationTestCase):
self.assertTrue(await self.ledger.update_history(address, remote_status))
self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1]))
self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync))
def wait_for_txid(self, txid, address):
return self.ledger.on_transaction.where(
lambda e: e.tx.id == txid and e.address == address
)
async def _test_transaction(self, send_amount, address, inputs, change):
tx = await Transaction.create(
[], [Output.pay_pubkey_hash(send_amount, self.ledger.address_to_hash160(address))], [self.account],
self.account
)
await self.ledger.broadcast(tx)
input_amounts = [txi.amount for txi in tx.inputs]
self.assertListEqual(inputs, input_amounts)
self.assertEqual(len(inputs), len(tx.inputs))
self.assertEqual(2, len(tx.outputs))
self.assertEqual(send_amount, tx.outputs[0].amount)
self.assertEqual(change, tx.outputs[1].amount)
return tx
async def assertSpendable(self, amounts):
spendable = await self.ledger.db.get_spendable_utxos(
self.ledger, 2000000000000, [self.account], set_reserved=False, return_insufficient_funds=True
)
got_amounts = [estimator.effective_amount for estimator in spendable]
self.assertListEqual(amounts, got_amounts)
async def test_sqlite_coin_chooser(self):
wallet_manager = WalletManager([self.wallet], {self.ledger.get_id(): self.ledger})
await self.blockchain.generate(300)
await self.assertBalance(self.account, '0.0')
address = await self.account.receiving.get_or_create_usable_address()
other_account = self.wallet.generate_account(self.ledger)
other_address = await other_account.receiving.get_or_create_usable_address()
self.ledger.coin_selection_strategy = 'sqlite'
await self.ledger.subscribe_account(self.account)
txids = []
txids.append(await self.blockchain.send_to_address(address, 1.0))
txids.append(await self.blockchain.send_to_address(address, 1.0))
txids.append(await self.blockchain.send_to_address(address, 3.0))
txids.append(await self.blockchain.send_to_address(address, 5.0))
txids.append(await self.blockchain.send_to_address(address, 10.0))
await asyncio.wait([self.wait_for_txid(txid, address) for txid in txids], timeout=1)
await self.assertBalance(self.account, '20.0')
await self.assertSpendable([99992600, 99992600, 299992600, 499992600, 999992600])
# send 1.5 lbc
first_tx = await Transaction.create(
[], [Output.pay_pubkey_hash(150000000, self.ledger.address_to_hash160(other_address))], [self.account],
self.account
)
self.assertEqual(2, len(first_tx.inputs))
self.assertEqual(2, len(first_tx.outputs))
self.assertEqual(100000000, first_tx.inputs[0].amount)
self.assertEqual(100000000, first_tx.inputs[1].amount)
self.assertEqual(150000000, first_tx.outputs[0].amount)
self.assertEqual(49980200, first_tx.outputs[1].amount)
await self.assertBalance(self.account, '18.0')
await self.assertSpendable([299992600, 499992600, 999992600])
await wallet_manager.broadcast_or_release(first_tx, blocking=True)
await self.assertSpendable([49972800, 299992600, 499992600, 999992600])
# 0.499, 3.0, 5.0, 10.0
await self.assertBalance(self.account, '18.499802')
# send 1.5lbc again
second_tx = await self._test_transaction(150000000, other_address, [49980200, 300000000], 199960400)
await self.assertSpendable([499992600, 999992600])
# replicate cancelling the api call after the tx broadcast while ledger.wait'ing it
e = asyncio.Event()
real_broadcast = self.ledger.broadcast
async def broadcast(tx):
try:
return await real_broadcast(tx)
finally:
e.set()
self.ledger.broadcast = broadcast
broadcast_task = asyncio.create_task(wallet_manager.broadcast_or_release(second_tx, blocking=True))
# wait for the broadcast to finish
await e.wait()
# cancel the api call
broadcast_task.cancel()
with self.assertRaises(asyncio.CancelledError):
await broadcast_task
# test if sending another 1.5 lbc will try to double spend the inputs from the cancelled tx
tx1 = await self._test_transaction(150000000, other_address, [500000000], 349987600)
await self.ledger.wait(tx1, timeout=1)
# wait for the cancelled transaction too, so that it's in the database
# needed to keep everything deterministic
await self.ledger.wait(second_tx, timeout=1)
await self.assertSpendable([199953000, 349980200, 999992600])
# spend deep into the mempool and see what else breaks
tx2 = await self._test_transaction(150000000, other_address, [199960400], 49948000)
await self.assertSpendable([349980200, 999992600])
await self.ledger.wait(tx2, timeout=1)
await self.assertSpendable([49940600, 349980200, 999992600])
tx3 = await self._test_transaction(150000000, other_address, [49948000, 349987600], 249915800)
await self.assertSpendable([999992600])
await self.ledger.wait(tx3, timeout=1)
await self.assertSpendable([249908400, 999992600])
tx4 = await self._test_transaction(150000000, other_address, [249915800], 99903400)
await self.assertSpendable([999992600])
await self.ledger.wait(tx4, timeout=1)
await self.assertBalance(self.account, '10.999034')
await self.assertSpendable([99896000, 999992600])
# spend more
tx5 = await self._test_transaction(100000000, other_address, [99903400, 1000000000], 999883600)
await self.assertSpendable([])
await self.ledger.wait(tx5, timeout=1)
await self.assertSpendable([999876200])
await self.assertBalance(self.account, '9.998836')

View file

@ -28,9 +28,10 @@ def mock_config():
class BlobExchangeTestBase(AsyncioTestCase):
async def asyncSetUp(self):
self.loop = asyncio.get_event_loop()
self.client_wallet_dir = tempfile.mkdtemp()
self.client_dir = tempfile.mkdtemp()
self.server_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.client_wallet_dir)
self.addCleanup(shutil.rmtree, self.client_dir)
self.addCleanup(shutil.rmtree, self.server_dir)
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
@ -39,8 +40,8 @@ class BlobExchangeTestBase(AsyncioTestCase):
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config)
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
fixed_peers=[])
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir,
wallet=self.client_wallet_dir, fixed_peers=[])
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
self.client_peer_manager = PeerManager(self.loop)

View file

@ -65,7 +65,7 @@ def get_claim_transaction(claim_name, claim=b''):
)
async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
async def get_mock_wallet(sd_hash, storage, wallet_dir, balance=10.0, fee=None):
claim = Claim()
if fee:
if fee['currency'] == 'LBC':
@ -97,7 +97,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
wallet = Wallet()
ledger = Ledger({
'db': Database(':memory:'),
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
'headers': FakeHeaders(514082)
})
await ledger.db.open()
@ -136,7 +136,8 @@ class TestStreamManager(BlobExchangeTestBase):
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
)
self.sd_hash = descriptor.sd_hash
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, self.client_wallet_dir,
balance, fee)
analytics_manager = AnalyticsManager(
self.client_config,
binascii.hexlify(generate_id()).decode(),

View file

@ -1,4 +1,7 @@
import os
import unittest
import tempfile
import shutil
from binascii import hexlify, unhexlify
from itertools import cycle
@ -302,9 +305,11 @@ class TestTransactionSigning(AsyncioTestCase):
class TransactionIOBalancing(AsyncioTestCase):
async def asyncSetUp(self):
wallet_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, wallet_dir)
self.ledger = Ledger({
'db': Database(':memory:'),
'headers': Headers(':memory:')
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
'headers': Headers(':memory:'),
})
await self.ledger.db.open()
self.account = Account.from_dict(
@ -419,3 +424,70 @@ class TransactionIOBalancing(AsyncioTestCase):
self.assertListEqual([0.01, 1], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([0.97], self.outputs(tx))
async def test_basic_use_cases_sqlite(self):
self.ledger.coin_selection_strategy = 'sqlite'
self.ledger.fee_per_byte = int(0.01*CENT)
# available UTXOs for filling missing inputs
utxos = await self.create_utxos([
1, 1, 3, 5, 10
])
self.assertEqual(5, len(await self.ledger.get_utxos()))
# pay 3 coins (3.07 w/ fees)
tx = await self.tx(
[], # inputs
[self.txo(3)] # outputs
)
await self.ledger.db.db.run(self.ledger.db._transaction_io, tx, tx.outputs[0].get_address(self.ledger), tx.id)
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 3.0])
# a change of 1.95 is added to reach balance
self.assertListEqual(self.outputs(tx), [3, 1.95])
# utxos: 1.95, 3, 5, 10
self.assertEqual(2, len(await self.ledger.get_utxos()))
# pay 4.946 coins (5.00 w/ fees)
tx = await self.tx(
[], # inputs
[self.txo(4.946)] # outputs
)
self.assertEqual(1, len(await self.ledger.get_utxos()))
self.assertListEqual(self.inputs(tx), [5.0])
self.assertEqual(2, len(tx.outputs))
self.assertEqual(494600000, tx.outputs[0].amount)
# utxos: 3, 1.95, 4.946, 10
await self.ledger.release_outputs(utxos)
# supplied input and output, but input is not enough to cover output
tx = await self.tx(
[self.txi(self.txo(10))], # inputs
[self.txo(11)] # outputs
)
# additional input is chosen (UTXO 1)
self.assertListEqual([10, 1.0, 1.0], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([11, 0.95], self.outputs(tx))
await self.ledger.release_outputs(utxos)
# liquidating a UTXO
tx = await self.tx(
[self.txi(self.txo(10))], # inputs
[] # outputs
)
self.assertListEqual([10], self.inputs(tx))
# missing change added to consume the amount
self.assertListEqual([9.98], self.outputs(tx))
await self.ledger.release_outputs(utxos)
# liquidating at a loss, requires adding extra inputs
tx = await self.tx(
[self.txi(self.txo(0.01))], # inputs
[] # outputs
)
# UTXO 1 is added to cover some of the fee
self.assertListEqual([0.01, 1], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([0.97], self.outputs(tx))