From a32a2ef04eccdff6038a2b9581abdfd6a439c9f9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 4 Jun 2020 10:18:04 -0400 Subject: [PATCH] add sqlite coin chooser --- lbry/wallet/coinselection.py | 2 +- lbry/wallet/database.py | 105 ++++++++++++++++++++++++++++++++++- lbry/wallet/ledger.py | 11 +++- 3 files changed, 113 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/coinselection.py b/lbry/wallet/coinselection.py index 63bbb6977..182c6d0d4 100644 --- a/lbry/wallet/coinselection.py +++ b/lbry/wallet/coinselection.py @@ -5,7 +5,7 @@ from lbry.wallet.transaction import OutputEffectiveAmountEstimator MAXIMUM_TRIES = 100000 -STRATEGIES = [] +STRATEGIES = ['sqlite'] # sqlite coin chooser is in database.py def strategy(method): diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 15c866017..f1672d0f1 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -4,6 +4,7 @@ import asyncio import sqlite3 import platform from binascii import hexlify +from collections import defaultdict from dataclasses import dataclass from contextvars import ContextVar from concurrent.futures.thread import ThreadPoolExecutor @@ -14,7 +15,7 @@ from prometheus_client import Gauge, Counter, Histogram from lbry.utils import LockWithMetrics from .bip32 import PubKey -from .transaction import Transaction, Output, OutputScript, TXRefImmutable +from .transaction import Transaction, Output, OutputScript, TXRefImmutable, Input from .constants import TXO_TYPES, CLAIM_TYPES from .util import date_to_julian_day @@ -466,6 +467,95 @@ def dict_row_factory(cursor, row): return d +SQLITE_MAX_INTEGER = 9223372036854775807 + + +def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decoded_transactions: Dict[str, Transaction], + result: Dict[Tuple[bytes, int, bool], List[int]], reserved: List[Transaction], + amount_to_reserve: int, reserved_amount: int, floor: int, ceiling: int, + fee_per_byte: int) -> int: + accounts_fmt = ",".join(["?"] * len(accounts)) + txo_query = f""" + SELECT tx.txid, txo.txoid, tx.raw, tx.height, txo.position as nout, tx.is_verified, txo.amount FROM txo + INNER JOIN account_address USING (address) + LEFT JOIN txi USING (txoid) + INNER JOIN tx USING (txid) + WHERE txo.txo_type=0 AND txi.txoid IS NULL AND tx.txid IS NOT NULL AND NOT txo.is_reserved + AND txo.amount >= ? AND txo.amount < ? + """ + if accounts: + txo_query += f""" + AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'} + """ + # prefer confirmed, but save unconfirmed utxos from this selection in case they are needed + unconfirmed = [] + for row in transaction.execute(txo_query, (floor, ceiling, *accounts)): + (txid, txoid, raw, height, nout, verified, amount) = row.values() + # verified or non verified transactions were found- reset the gap count + # multiple txos can come from the same tx, only decode it once and cache + if txid not in decoded_transactions: + # cache the decoded transaction + decoded_transactions[txid] = Transaction(raw) + decoded_tx = decoded_transactions[txid] + # save the unconfirmed txo for possible use later, if still needed + if verified: + # add the txo to the reservation, minus the fee for including it + reserved_amount += amount + reserved_amount -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte + # mark it as reserved + result[(raw, height, verified)].append(nout) + reserved.append(txoid) + # if we've reserved enough, return + if reserved_amount >= amount_to_reserve: + return reserved_amount + else: + unconfirmed.append((txid, txoid, raw, height, nout, verified, amount)) + # we're popping the items, so to get them in the order they were seen they are reversed + unconfirmed.reverse() + # add available unconfirmed txos if any were previously found + while unconfirmed and reserved_amount < amount_to_reserve: + (txid, txoid, raw, height, nout, verified, amount) = unconfirmed.pop() + # it's already decoded + decoded_tx = decoded_transactions[txid] + # add to the reserved amount + reserved_amount += amount + reserved_amount -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte + result[(raw, height, verified)].append(nout) + reserved.append(txoid) + return reserved_amount + + +def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: List, amount_to_reserve: int, floor: int, + fee_per_byte: int, set_reserved: bool): + txs = defaultdict(list) + decoded_transactions = {} + reserved = [] + + reserved_dewies = 0 + multiplier = 10 + gap_count = 0 + + while reserved_dewies < amount_to_reserve and gap_count < 5 and floor * multiplier < SQLITE_MAX_INTEGER: + previous_reserved_dewies = reserved_dewies + reserved_dewies = _get_spendable_utxos( + transaction, accounts, decoded_transactions, txs, reserved, amount_to_reserve, reserved_dewies, + floor, floor * multiplier, fee_per_byte + ) + floor *= multiplier + if previous_reserved_dewies == reserved_dewies: + gap_count += 1 + multiplier **= 2 + else: + gap_count = 0 + multiplier = 10 + + # reserve the accumulated txos if enough were found + if reserved_dewies >= amount_to_reserve and set_reserved: + transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?", + [(True, txoid) for txoid in reserved]).fetchall() + return txs + + class Database(SQLiteMixin): SCHEMA_VERSION = "1.3" @@ -666,6 +756,19 @@ class Database(SQLiteMixin): # 2. update address histories removing deleted TXs return True + async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000, + fee_per_byte: int = 50, set_reserved: bool = True) -> List: + to_spend = await self.db.run( + get_and_reserve_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount, + fee_per_byte, set_reserved + ) + txos = [] + for (raw, height, verified), positions in to_spend.items(): + tx = Transaction(raw, height=height, is_verified=verified) + for nout in positions: + txos.append(tx.outputs[nout].get_estimator(ledger)) + return txos + async def select_transactions(self, cols, accounts=None, read_only=False, **constraints): if not {'txid', 'txid__in'}.intersection(constraints): assert accounts, "'accounts' argument required when no 'txid' constraint is present" diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 50adf7467..194f1baa7 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -244,11 +244,16 @@ class Ledger(metaclass=LedgerRegistry): def get_address_count(self, **constraints): return self.db.get_address_count(**constraints) - async def get_spendable_utxos(self, amount: int, funding_accounts): + async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], + min_amount=100000): + min_amount = min(amount // 10, min_amount) + fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self) + selector = CoinSelector(amount, fee) async with self._utxo_reservation_lock: + if self.coin_selection_strategy == 'sqlite': + return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount, + fee_per_byte=self.fee_per_byte) txos = await self.get_effective_amount_estimators(funding_accounts) - fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self) - selector = CoinSelector(amount, fee) spendables = selector.select(txos, self.coin_selection_strategy) if spendables: await self.reserve_outputs(s.txo for s in spendables)