forked from LBRYCommunity/lbry-sdk
166 lines
6 KiB
Python
166 lines
6 KiB
Python
from random import Random
|
|
from typing import List
|
|
|
|
from lbry.blockchain.transaction import Input, Output
|
|
|
|
MAXIMUM_TRIES = 100000
|
|
|
|
COIN_SELECTION_STRATEGIES = []
|
|
|
|
|
|
def strategy(method):
|
|
COIN_SELECTION_STRATEGIES.append(method.__name__)
|
|
return method
|
|
|
|
|
|
class OutputEffectiveAmountEstimator:
|
|
|
|
__slots__ = 'txo', 'txi', 'fee', 'effective_amount'
|
|
|
|
def __init__(self, ledger, txo: Output) -> None:
|
|
self.txo = txo
|
|
self.txi = Input.spend(txo)
|
|
self.fee: int = self.txi.get_fee(ledger)
|
|
self.effective_amount: int = txo.amount - self.fee
|
|
|
|
def __lt__(self, other):
|
|
return self.effective_amount < other.effective_amount
|
|
|
|
|
|
class CoinSelector:
|
|
|
|
def __init__(self, target: int, cost_of_change: int, seed: str = None) -> None:
|
|
self.target = target
|
|
self.cost_of_change = cost_of_change
|
|
self.exact_match = False
|
|
self.tries = 0
|
|
self.random = Random(seed)
|
|
if seed is not None:
|
|
self.random.seed(seed, version=1)
|
|
|
|
def select(
|
|
self, txos: List[OutputEffectiveAmountEstimator],
|
|
strategy_name: str = None) -> List[OutputEffectiveAmountEstimator]:
|
|
if not txos:
|
|
return []
|
|
available = sum(c.effective_amount for c in txos)
|
|
if self.target > available:
|
|
return []
|
|
return getattr(self, strategy_name or "standard")(txos, available)
|
|
|
|
@strategy
|
|
def prefer_confirmed(self, txos: List[OutputEffectiveAmountEstimator],
|
|
available: int) -> List[OutputEffectiveAmountEstimator]:
|
|
return (
|
|
self.only_confirmed(txos, available) or
|
|
self.standard(txos, available)
|
|
)
|
|
|
|
@strategy
|
|
def only_confirmed(self, txos: List[OutputEffectiveAmountEstimator],
|
|
_) -> List[OutputEffectiveAmountEstimator]:
|
|
confirmed = [t for t in txos if t.txo.tx_ref and t.txo.tx_ref.height > 0]
|
|
if not confirmed:
|
|
return []
|
|
confirmed_available = sum(c.effective_amount for c in confirmed)
|
|
if self.target > confirmed_available:
|
|
return []
|
|
return self.standard(confirmed, confirmed_available)
|
|
|
|
@strategy
|
|
def standard(self, txos: List[OutputEffectiveAmountEstimator],
|
|
available: int) -> List[OutputEffectiveAmountEstimator]:
|
|
return (
|
|
self.branch_and_bound(txos, available) or
|
|
self.closest_match(txos, available) or
|
|
self.random_draw(txos, available)
|
|
)
|
|
|
|
@strategy
|
|
def branch_and_bound(self, txos: List[OutputEffectiveAmountEstimator],
|
|
available: int) -> List[OutputEffectiveAmountEstimator]:
|
|
# see bitcoin implementation for more info:
|
|
# https://github.com/bitcoin/bitcoin/blob/master/src/wallet/coinselection.cpp
|
|
|
|
txos.sort(reverse=True)
|
|
|
|
current_value = 0
|
|
current_available_value = available
|
|
current_selection: List[bool] = []
|
|
best_waste = self.cost_of_change
|
|
best_selection: List[bool] = []
|
|
|
|
while self.tries < MAXIMUM_TRIES:
|
|
self.tries += 1
|
|
|
|
backtrack = False
|
|
if current_value + current_available_value < self.target or \
|
|
current_value > self.target + self.cost_of_change:
|
|
backtrack = True
|
|
elif current_value >= self.target:
|
|
new_waste = current_value - self.target
|
|
if new_waste <= best_waste:
|
|
best_waste = new_waste
|
|
best_selection = current_selection[:]
|
|
backtrack = True
|
|
|
|
if backtrack:
|
|
while current_selection and not current_selection[-1]:
|
|
current_selection.pop()
|
|
current_available_value += txos[len(current_selection)].effective_amount
|
|
|
|
if not current_selection:
|
|
break
|
|
|
|
current_selection[-1] = False
|
|
utxo = txos[len(current_selection) - 1]
|
|
current_value -= utxo.effective_amount
|
|
|
|
else:
|
|
utxo = txos[len(current_selection)]
|
|
current_available_value -= utxo.effective_amount
|
|
previous_utxo = txos[len(current_selection) - 1] if current_selection else None
|
|
if current_selection and not current_selection[-1] and previous_utxo and \
|
|
utxo.effective_amount == previous_utxo.effective_amount and \
|
|
utxo.fee == previous_utxo.fee:
|
|
current_selection.append(False)
|
|
else:
|
|
current_selection.append(True)
|
|
current_value += utxo.effective_amount
|
|
|
|
if best_selection:
|
|
self.exact_match = True
|
|
return [
|
|
txos[i] for i, include in enumerate(best_selection) if include
|
|
]
|
|
|
|
return []
|
|
|
|
@strategy
|
|
def closest_match(self, txos: List[OutputEffectiveAmountEstimator],
|
|
_) -> List[OutputEffectiveAmountEstimator]:
|
|
""" Pick one UTXOs that is larger than the target but with the smallest change. """
|
|
target = self.target + self.cost_of_change
|
|
smallest_change = None
|
|
best_match = None
|
|
for txo in txos:
|
|
if txo.effective_amount >= target:
|
|
change = txo.effective_amount - target
|
|
if smallest_change is None or change < smallest_change:
|
|
smallest_change, best_match = change, txo
|
|
return [best_match] if best_match else []
|
|
|
|
@strategy
|
|
def random_draw(self, txos: List[OutputEffectiveAmountEstimator],
|
|
_) -> List[OutputEffectiveAmountEstimator]:
|
|
""" Accumulate UTXOs at random until there is enough to cover the target. """
|
|
target = self.target + self.cost_of_change
|
|
self.random.shuffle(txos, self.random.random)
|
|
selection = []
|
|
amount = 0
|
|
for coin in txos:
|
|
selection.append(coin)
|
|
amount += coin.effective_amount
|
|
if amount >= target:
|
|
return selection
|
|
return []
|