Merge pull request #700 from wpaulino/persistent-output-locking

wallet: introduce persistent output leases
This commit is contained in:
Olaoluwa Osuntokun 2020-05-29 15:48:44 -07:00 committed by GitHub
commit c52dcaf17c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 847 additions and 17 deletions

6
go.sum
View file

@ -27,6 +27,7 @@ github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY=
github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@ -59,6 +60,8 @@ github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
github.com/lightninglabs/neutrino v0.11.0 h1:lPpYFCtsfJX2W5zI4pWycPmbbBdr7zU+BafYdLoD6k0=
github.com/lightninglabs/neutrino v0.11.0/go.mod h1:CuhF0iuzg9Sp2HO6ZgXgayviFTn1QHdSTJlMncK80wg=
github.com/lightningnetwork/lnd/clock v1.0.1 h1:QQod8+m3KgqHdvVMV+2DRNNZS1GRFir8mHZYA+Z2hFo=
github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg=
github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0=
github.com/lightningnetwork/lnd/queue v1.0.1/go.mod h1:vaQwexir73flPW43Mrm7JOgJHmcEFBWWSl9HlyASoms=
github.com/lightningnetwork/lnd/ticker v1.0.0 h1:S1b60TEGoTtCe2A0yeB+ecoj/kkS4qpwh6l+AkQEZwU=
@ -71,6 +74,9 @@ github.com/onsi/gomega v1.4.1 h1:PZSj/UFNaVp3KxrzHOcS7oyuWA7LoOY/77yCTEFu21U=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q=

View file

@ -315,9 +315,9 @@ func (w *Wallet) SetChainSynced(synced bool) {
// activeData returns the currently-active receiving addresses and all unspent
// outputs. This is primarely intended to provide the parameters for a
// rescan request.
func (w *Wallet) activeData(dbtx walletdb.ReadTx) ([]btcutil.Address, []wtxmgr.Credit, error) {
func (w *Wallet) activeData(dbtx walletdb.ReadWriteTx) ([]btcutil.Address, []wtxmgr.Credit, error) {
addrmgrNs := dbtx.ReadBucket(waddrmgrNamespaceKey)
txmgrNs := dbtx.ReadBucket(wtxmgrNamespaceKey)
txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
var addrs []btcutil.Address
err := w.Manager.ForEachRelevantActiveAddress(
@ -329,6 +329,16 @@ func (w *Wallet) activeData(dbtx walletdb.ReadTx) ([]btcutil.Address, []wtxmgr.C
if err != nil {
return nil, nil, err
}
// Before requesting the list of spendable UTXOs, we'll delete any
// expired output locks.
err = w.TxStore.DeleteExpiredLockedOutputs(
dbtx.ReadWriteBucket(wtxmgrNamespaceKey),
)
if err != nil {
return nil, nil, err
}
unspent, err := w.TxStore.UnspentOutputs(txmgrNs)
return addrs, unspent, err
}
@ -508,7 +518,7 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error {
addrs []btcutil.Address
unspent []wtxmgr.Credit
)
err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error {
err = walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error {
addrs, unspent, err = w.activeData(dbtx)
return err
})
@ -2854,6 +2864,42 @@ func (w *Wallet) LockedOutpoints() []btcjson.TransactionInput {
return locked
}
// LeaseOutput locks an output to the given ID, preventing it from being
// available for coin selection. The absolute time of the lock's expiration is
// returned. The expiration of the lock can be extended by successive
// invocations of this call.
//
// Outputs can be unlocked before their expiration through `UnlockOutput`.
// Otherwise, they are unlocked lazily through calls which iterate through all
// known outputs, e.g., `CalculateBalance`, `ListUnspent`.
//
// If the output is not known, ErrUnknownOutput is returned. If the output has
// already been locked to a different ID, then ErrOutputAlreadyLocked is
// returned.
//
// NOTE: This differs from LockOutpoint in that outputs are locked for a limited
// amount of time and their locks are persisted to disk.
func (w *Wallet) LeaseOutput(id wtxmgr.LockID, op wire.OutPoint) (time.Time, error) {
var expiry time.Time
err := walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(wtxmgrNamespaceKey)
var err error
expiry, err = w.TxStore.LockOutput(ns, id, op)
return err
})
return expiry, err
}
// ReleaseOutput unlocks an output, allowing it to be available for coin
// selection if it remains unspent. The ID should match the one used to
// originally lock the output.
func (w *Wallet) ReleaseOutput(id wtxmgr.LockID, op wire.OutPoint) error {
return walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(wtxmgrNamespaceKey)
return w.TxStore.UnlockOutput(ns, id, op)
})
}
// resendUnminedTxs iterates through all transactions that spend from wallet
// credits that are not known to have been mined into a block, and attempts
// to send each to the chain server for relay.

View file

@ -69,6 +69,7 @@ var (
bucketUnmined = []byte("m")
bucketUnminedCredits = []byte("mc")
bucketUnminedInputs = []byte("mi")
bucketLockedOutputs = []byte("lo")
)
// Root (namespace) bucket keys
@ -116,10 +117,10 @@ func putMinedBalance(ns walletdb.ReadWriteBucket, amt btcutil.Amount) error {
// The canonical transaction hash serialization is simply the hash.
func canonicalOutPoint(txHash *chainhash.Hash, index uint32) []byte {
k := make([]byte, 36)
copy(k, txHash[:])
var k [36]byte
copy(k[:32], txHash[:])
byteOrder.PutUint32(k[32:36], index)
return k
return k[:]
}
func readCanonicalOutPoint(k []byte, op *wire.OutPoint) error {
@ -1287,6 +1288,123 @@ func deleteRawUnminedInput(ns walletdb.ReadWriteBucket, outPointKey []byte,
return nil
}
// serializeLockedOutput serializes the value of a locked output.
func serializeLockedOutput(id LockID, expiry time.Time) []byte {
var v [len(id) + 8]byte
copy(v[:len(id)], id[:])
byteOrder.PutUint64(v[len(id):], uint64(expiry.Unix()))
return v[:]
}
// deserializeLockedOutput deserializes the value of a locked output.
func deserializeLockedOutput(v []byte) (LockID, time.Time) {
var id LockID
copy(id[:], v[:len(id)])
expiry := time.Unix(int64(byteOrder.Uint64(v[len(id):])), 0)
return id, expiry
}
// isLockedOutput determines whether an output is locked. If it is, its assigned
// ID is returned, along with its absolute expiration time. If the output lock
// exists, but its expiration has been met, then the output is considered
// unlocked.
func isLockedOutput(ns walletdb.ReadBucket, op wire.OutPoint,
timeNow time.Time) (LockID, time.Time, bool) {
// The bucket may not exist, indicating that no outputs have ever been
// locked, so we can just return now.
lockedOutputs := ns.NestedReadBucket(bucketLockedOutputs)
if lockedOutputs == nil {
return LockID{}, time.Time{}, false
}
// Retrieve the output lock, if any, and extract the relevant fields.
k := canonicalOutPoint(&op.Hash, op.Index)
v := lockedOutputs.Get(k)
if v == nil {
return LockID{}, time.Time{}, false
}
lockID, expiry := deserializeLockedOutput(v)
// If the output lock has already expired, delete it now.
if !timeNow.Before(expiry) {
return LockID{}, time.Time{}, false
}
return lockID, expiry, true
}
// lockOutput creates a lock for `duration` over an output assigned to the `id`,
// preventing it from becoming eligible for coin selection.
func lockOutput(ns walletdb.ReadWriteBucket, id LockID, op wire.OutPoint,
expiry time.Time) error {
// Create the corresponding bucket if necessary.
lockedOutputs, err := ns.CreateBucketIfNotExists(bucketLockedOutputs)
if err != nil {
str := "failed to create locked outputs bucket"
return storeError(ErrDatabase, str, err)
}
// Store a mapping of outpoint -> (id, expiry).
k := canonicalOutPoint(&op.Hash, op.Index)
v := serializeLockedOutput(id, expiry)
if err := lockedOutputs.Put(k, v[:]); err != nil {
str := fmt.Sprintf("%s: put failed for %v", bucketLockedOutputs,
op)
return storeError(ErrDatabase, str, err)
}
return nil
}
// unlockOutput removes a lock over an output, making it eligible for coin
// selection if still unspent.
func unlockOutput(ns walletdb.ReadWriteBucket, op wire.OutPoint) error {
// The bucket may not exist, indicating that no outputs have ever been
// locked, so we can just return now.
lockedOutputs := ns.NestedReadWriteBucket(bucketLockedOutputs)
if lockedOutputs == nil {
return nil
}
// Delete the key-value pair representing the output lock.
k := canonicalOutPoint(&op.Hash, op.Index)
if err := lockedOutputs.Delete(k); err != nil {
str := fmt.Sprintf("%s: delete failed for %v",
bucketLockedOutputs, op)
return storeError(ErrDatabase, str, err)
}
return nil
}
// forEachLockedOutput iterates over all existing locked outputs and invokes the
// callback `f` for each.
func forEachLockedOutput(ns walletdb.ReadBucket,
f func(wire.OutPoint, LockID, time.Time)) error {
// The bucket may not exist, indicating that no outputs have ever been
// locked, so we can just return now.
lockedOutputs := ns.NestedReadBucket(bucketLockedOutputs)
if lockedOutputs == nil {
return nil
}
return lockedOutputs.ForEach(func(k, v []byte) error {
var op wire.OutPoint
if err := readCanonicalOutPoint(k, &op); err != nil {
return err
}
lockID, expiry := deserializeLockedOutput(v)
f(op, lockID, expiry)
return nil
})
}
// openStore opens an existing transaction store from the passed namespace.
func openStore(ns walletdb.ReadBucket) error {
version, err := fetchVersion(ns)
@ -1382,6 +1500,10 @@ func createBuckets(ns walletdb.ReadWriteBucket) error {
str := "failed to create unmined inputs bucket"
return storeError(ErrDatabase, str, err)
}
if _, err := ns.CreateBucket(bucketLockedOutputs); err != nil {
str := "failed to create locked outputs bucket"
return storeError(ErrDatabase, str, err)
}
return nil
}
@ -1421,6 +1543,10 @@ func deleteBuckets(ns walletdb.ReadWriteBucket) error {
str := "failed to delete unmined inputs bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketLockedOutputs); err != nil {
str := "failed to delete locked outputs bucket"
return storeError(ErrDatabase, str, err)
}
return nil
}

View file

@ -7,4 +7,6 @@ require (
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/btcsuite/btcwallet/walletdb v1.2.0
github.com/lightningnetwork/lnd/clock v1.0.1
github.com/stretchr/testify v1.5.1 // indirect
)

View file

@ -17,6 +17,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY=
github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@ -28,11 +29,18 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/lightningnetwork/lnd/clock v1.0.1 h1:QQod8+m3KgqHdvVMV+2DRNNZS1GRFir8mHZYA+Z2hFo=
github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q=
@ -56,3 +64,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -18,10 +18,16 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/clock"
)
// TxLabelLimit is the length limit we impose on transaction labels.
const TxLabelLimit = 500
const (
// TxLabelLimit is the length limit we impose on transaction labels.
TxLabelLimit = 500
// DefaultLockDuration is the default duration used to lock outputs.
DefaultLockDuration = 10 * time.Minute
)
var (
// ErrEmptyLabel is returned when an attempt to write a label that is
@ -40,6 +46,18 @@ var (
// ErrTxLabelNotFound is returned when no label is found for a
// transaction hash.
ErrTxLabelNotFound = errors.New("label for transaction not found")
// ErrUnknownOutput is an error returned when an output not known to the
// wallet is attempted to be locked.
ErrUnknownOutput = errors.New("unknown output")
// ErrOutputAlreadyLocked is an error returned when an output has
// already been locked to a different ID.
ErrOutputAlreadyLocked = errors.New("output already locked")
// ErrOutputUnlockNotAllowed is an error returned when an output unlock
// is attempted with a different ID than the one which locked it.
ErrOutputUnlockNotAllowed = errors.New("output unlock not alowed")
)
// Block contains the minimum amount of data to uniquely identify any block on
@ -155,25 +173,33 @@ type Credit struct {
FromCoinBase bool
}
// LockID represents a unique context-specific ID assigned to an output lock.
type LockID [32]byte
// Store implements a transaction store for storing and managing wallet
// transactions.
type Store struct {
chainParams *chaincfg.Params
// clock is used to determine when outputs locks have expired.
clock clock.Clock
// Event callbacks. These execute in the same goroutine as the wtxmgr
// caller.
NotifyUnspent func(hash *chainhash.Hash, index uint32)
}
// Open opens the wallet transaction store from a walletdb namespace. If the
// store does not exist, ErrNoExist is returned.
// store does not exist, ErrNoExist is returned. `lockDuration` represents how
// long outputs are locked for.
func Open(ns walletdb.ReadBucket, chainParams *chaincfg.Params) (*Store, error) {
// Open the store.
err := openStore(ns)
if err != nil {
return nil, err
}
s := &Store{chainParams, nil} // TODO: set callbacks
s := &Store{chainParams, clock.NewDefaultClock(), nil} // TODO: set callbacks
return s, nil
}
@ -401,7 +427,19 @@ func (s *Store) insertMinedTx(ns walletdb.ReadWriteBucket, rec *TxRecord,
// from the unconfirmed set. This also handles removing unconfirmed
// transaction spend chains if any other unconfirmed transactions spend
// outputs of the removed double spend.
return s.removeDoubleSpends(ns, rec)
if err := s.removeDoubleSpends(ns, rec); err != nil {
return err
}
// Clear any locked outputs since we now have a confirmed spend for
// them, making them not eligible for coin selection anyway.
for _, txIn := range rec.MsgTx.TxIn {
if err := unlockOutput(ns, txIn.PreviousOutPoint); err != nil {
return err
}
}
return nil
}
// AddCredit marks a transaction record as containing a transaction output
@ -740,11 +778,19 @@ func (s *Store) UnspentOutputs(ns walletdb.ReadBucket) ([]Credit, error) {
if err != nil {
return err
}
// Skip the output if it's locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
return nil
}
if existsRawUnminedInput(ns, k) != nil {
// Output is spent by an unmined transaction.
// Skip this k/v pair.
return nil
}
err = readUnspentBlock(v, &block)
if err != nil {
return err
@ -786,17 +832,22 @@ func (s *Store) UnspentOutputs(ns walletdb.ReadBucket) ([]Credit, error) {
}
err = ns.NestedReadBucket(bucketUnminedCredits).ForEach(func(k, v []byte) error {
if err := readCanonicalOutPoint(k, &op); err != nil {
return err
}
// Skip the output if it's locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
return nil
}
if existsRawUnminedInput(ns, k) != nil {
// Output is spent by an unmined transaction.
// Skip to next unmined credit.
return nil
}
err := readCanonicalOutPoint(k, &op)
if err != nil {
return err
}
// TODO(jrick): Reading/parsing the entire transaction record
// just for the output amount and script can be avoided.
recVal := existsRawUnmined(ns, op.Hash[:])
@ -858,6 +909,22 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
if err != nil {
return err
}
// Subtract the output's amount if it's locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
_, v := existsCredit(ns, &op.Hash, op.Index, &block)
amt, err := fetchRawCreditAmount(v)
if err != nil {
return err
}
bal -= amt
// To prevent decrementing the balance twice if the
// output has an unconfirmed spend, return now.
return nil
}
if existsRawUnminedInput(ns, k) != nil {
_, v := existsCredit(ns, &op.Hash, op.Index, &block)
amt, err := fetchRawCreditAmount(v)
@ -866,6 +933,7 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
}
bal -= amt
}
return nil
})
if err != nil {
@ -902,7 +970,14 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
for i := uint32(0); i < numOuts; i++ {
// Avoid double decrementing the credit amount
// if it was already removed for being spent by
// an unmined tx.
// an unmined tx or being locked.
op = wire.OutPoint{Hash: *txHash, Index: i}
_, _, isLocked := isLockedOutput(
ns, op, s.clock.Now(),
)
if isLocked {
continue
}
opKey := canonicalOutPoint(txHash, i)
if existsRawUnminedInput(ns, opKey) != nil {
continue
@ -935,6 +1010,17 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
// output that is unspent.
if minConf == 0 {
err = ns.NestedReadBucket(bucketUnminedCredits).ForEach(func(k, v []byte) error {
if err := readCanonicalOutPoint(k, &op); err != nil {
return err
}
// Skip adding the balance for this output if it's
// locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
return nil
}
if existsRawUnminedInput(ns, k) != nil {
// Output is spent by an unmined transaction.
// Skip to next unmined credit.
@ -1041,3 +1127,102 @@ func DeserializeLabel(v []byte) (string, error) {
label := string(v[2:])
return label, nil
}
// isKnownOutput returns whether the output is known to the transaction store
// either as confirmed or unconfirmed.
func isKnownOutput(ns walletdb.ReadWriteBucket, op wire.OutPoint) bool {
k := canonicalOutPoint(&op.Hash, op.Index)
if existsRawUnminedCredit(ns, k) != nil {
return true
}
if existsRawUnspent(ns, k) != nil {
return true
}
return false
}
// LockOutput locks an output to the given ID, preventing it from being
// available for coin selection. The absolute time of the lock's expiration is
// returned. The expiration of the lock can be extended by successive
// invocations of this call.
//
// Outputs can be unlocked before their expiration through `UnlockOutput`.
// Otherwise, they are unlocked lazily through calls which iterate through all
// known outputs, e.g., `Balance`, `UnspentOutputs`.
//
// If the output is not known, ErrUnknownOutput is returned. If the output has
// already been locked to a different ID, then ErrOutputAlreadyLocked is
// returned.
func (s *Store) LockOutput(ns walletdb.ReadWriteBucket, id LockID,
op wire.OutPoint) (time.Time, error) {
// Make sure the output is known.
if !isKnownOutput(ns, op) {
return time.Time{}, ErrUnknownOutput
}
// Make sure the output hasn't already been locked to some other ID.
lockedID, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked && lockedID != id {
return time.Time{}, ErrOutputAlreadyLocked
}
expiry := s.clock.Now().Add(DefaultLockDuration)
if err := lockOutput(ns, id, op, expiry); err != nil {
return time.Time{}, err
}
return expiry, nil
}
// UnlockOutput unlocks an output, allowing it to be available for coin
// selection if it remains unspent. The ID should match the one used to
// originally lock the output.
func (s *Store) UnlockOutput(ns walletdb.ReadWriteBucket, id LockID,
op wire.OutPoint) error {
// Make sure the output is known.
if !isKnownOutput(ns, op) {
return ErrUnknownOutput
}
// If the output has already been unlocked, we can return now.
lockedID, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if !isLocked {
return nil
}
// Make sure the output was locked to the same ID.
if lockedID != id {
return ErrOutputUnlockNotAllowed
}
return unlockOutput(ns, op)
}
// DeleteExpiredLockedOutputs iterates through all existing locked outputs and
// deletes those which have already expired.
func (s *Store) DeleteExpiredLockedOutputs(ns walletdb.ReadWriteBucket) error {
// Collect all expired output locks first to remove them later on. This
// is necessary as deleting while iterating would invalidate the
// iterator.
var expiredOutputs []wire.OutPoint
err := forEachLockedOutput(
ns, func(op wire.OutPoint, _ LockID, expiration time.Time) {
if !s.clock.Now().Before(expiration) {
expiredOutputs = append(expiredOutputs, op)
}
},
)
if err != nil {
return err
}
for _, op := range expiredOutputs {
if err := unlockOutput(ns, op); err != nil {
return err
}
}
return nil
}

View file

@ -19,6 +19,7 @@ import (
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb"
_ "github.com/btcsuite/btcwallet/walletdb/bdb"
"github.com/lightningnetwork/lnd/clock"
)
// Received transaction output for mainnet outpoint
@ -2355,3 +2356,457 @@ func TestTxLabel(t *testing.T) {
t.Fatalf("expected: %v, got: %v", ErrTxLabelNotFound, err)
}
}
func assertBalance(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
confirmed bool, blockHeight int32, exp btcutil.Amount) {
t.Helper()
minConf := int32(0)
if confirmed {
minConf = 1
}
balance, err := s.Balance(ns, minConf, blockHeight)
if err != nil {
t.Fatal(err)
}
if balance != exp {
t.Fatalf("expected balance %v, got %v", exp, balance)
}
}
func assertUtxos(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
exp []wire.OutPoint) {
t.Helper()
utxos, err := s.UnspentOutputs(ns)
if err != nil {
t.Fatal(err)
}
for _, expUtxo := range exp {
found := false
for _, utxo := range utxos {
if expUtxo == utxo.OutPoint {
found = true
break
}
}
if !found {
t.Fatalf("expected utxo %v", expUtxo)
}
}
}
func assertLocked(t *testing.T, ns walletdb.ReadWriteBucket, op wire.OutPoint,
timeNow time.Time, exp bool) {
t.Helper()
_, _, locked := isLockedOutput(ns, op, timeNow)
if locked && locked != exp {
t.Fatalf("expected locked output %v", op)
}
if !locked && locked != exp {
t.Fatalf("unexpected locked output %v", op)
}
}
func assertOutputLocksExist(t *testing.T, s *Store, ns walletdb.ReadBucket,
exp ...wire.OutPoint) {
t.Helper()
var found []wire.OutPoint
forEachLockedOutput(ns, func(op wire.OutPoint, _ LockID, _ time.Time) {
found = append(found, op)
})
if len(found) != len(exp) {
t.Fatalf("expected to find %v locked output(s), found %v",
len(exp), len(found))
}
for _, expOp := range exp {
exists := false
for _, foundOp := range found {
if expOp == foundOp {
exists = true
break
}
}
if !exists {
t.Fatalf("expected output lock for %v to exist", expOp)
}
}
}
func lock(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
id LockID, op wire.OutPoint, exp error) time.Time {
t.Helper()
expiry, err := s.LockOutput(ns, id, op)
if err != exp {
t.Fatalf("expected err %q, got %q", exp, err)
}
if exp != nil && exp != ErrOutputAlreadyLocked {
assertLocked(t, ns, op, s.clock.Now(), false)
} else {
assertLocked(t, ns, op, s.clock.Now(), true)
}
return expiry
}
func unlock(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
id LockID, op wire.OutPoint, exp error) {
t.Helper()
if err := s.UnlockOutput(ns, id, op); err != exp {
t.Fatalf("expected err %q, got %q", exp, err)
}
if exp != nil {
assertLocked(t, ns, op, s.clock.Now(), true)
} else {
assertLocked(t, ns, op, s.clock.Now(), false)
}
}
func insertUnconfirmedCredit(t *testing.T, store *Store, db walletdb.DB,
tx *wire.MsgTx, idx uint32) {
t.Helper()
insertConfirmedCredit(t, store, db, tx, idx, nil)
}
func insertConfirmedCredit(t *testing.T, store *Store, db walletdb.DB,
tx *wire.MsgTx, idx uint32, block *BlockMeta) {
t.Helper()
commitDBTx(t, store, db, func(ns walletdb.ReadWriteBucket) {
rec, err := NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
t.Fatal(err)
}
if err := store.InsertTx(ns, rec, block); err != nil {
t.Fatal(err)
}
if err := store.AddCredit(ns, rec, block, idx, false); err != nil {
t.Fatal(err)
}
})
}
// TestOutputLocks aims to test all cases revolving output locks, ensuring they
// are and aren't eligible for coin selection after certain operations.
func TestOutputLocks(t *testing.T) {
t.Parallel()
// Define a series of constants we'll use throughout our tests.
block := &BlockMeta{
Block: Block{
Hash: chainhash.Hash{1, 3, 3, 7},
Height: 1337,
},
Time: time.Now(),
}
// Create a coinbase transaction with two outputs, which we'll spend.
coinbase := newCoinBase(
btcutil.SatoshiPerBitcoin, btcutil.SatoshiPerBitcoin*2,
)
coinbaseHash := coinbase.TxHash()
// One of the spends will be unconfirmed.
const unconfirmedBalance = btcutil.SatoshiPerBitcoin / 2
unconfirmedTx := spendOutput(&coinbaseHash, 0, unconfirmedBalance)
unconfirmedOutPoint := wire.OutPoint{
Hash: unconfirmedTx.TxHash(),
Index: 0,
}
// The other will be confirmed.
const confirmedBalance = btcutil.SatoshiPerBitcoin
confirmedTx := spendOutput(&coinbaseHash, 1, confirmedBalance)
confirmedOutPoint := wire.OutPoint{
Hash: confirmedTx.TxHash(),
Index: 0,
}
const balance = unconfirmedBalance + confirmedBalance
testCases := []struct {
name string
run func(*testing.T, *Store, walletdb.ReadWriteBucket)
}{
{
// Asserts that we cannot lock unknown outputs to the
// store.
name: "unknown output",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
lockID := LockID{1}
op := wire.OutPoint{Index: 1}
_ = lock(t, s, ns, lockID, op, ErrUnknownOutput)
},
},
{
// Asserts that we cannot lock outputs that have already
// been locked to someone else.
name: "already locked output",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
lockID1 := LockID{1}
lockID2 := LockID{2}
_ = lock(
t, s, ns, lockID1, unconfirmedOutPoint,
nil,
)
_ = lock(
t, s, ns, lockID2, unconfirmedOutPoint,
ErrOutputAlreadyLocked,
)
_ = lock(
t, s, ns, lockID1, confirmedOutPoint,
nil,
)
_ = lock(
t, s, ns, lockID2, confirmedOutPoint,
ErrOutputAlreadyLocked,
)
},
},
{
// Asserts that only the ID which locked at output can
// manually unlock it.
name: "unlock output",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
lockID1 := LockID{1}
lockID2 := LockID{2}
_ = lock(t, s, ns, lockID1, confirmedOutPoint, nil)
unlock(
t, s, ns, lockID2, confirmedOutPoint,
ErrOutputUnlockNotAllowed,
)
unlock(t, s, ns, lockID1, confirmedOutPoint, nil)
},
},
{
// Asserts that locking an output that's already locked
// with the correct ID results in an extension of the
// lock.
name: "extend locked output lease",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// Lock the output and set the clock time a
// minute before the expiration. It should
// remain locked.
lockID := LockID{1}
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
s.clock.(*clock.TestClock).SetTime(
expiry.Add(-time.Minute),
)
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
true,
)
// Lock it once again, extending its expiration,
// and set the clock time a second before the
// expiration. It should remain locked.
s.clock.(*clock.TestClock).SetTime(
expiry.Add(-time.Minute),
)
newExpiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
if !newExpiry.After(expiry) {
t.Fatal("expected output lock " +
"duration to be renewed")
}
s.clock.(*clock.TestClock).SetTime(
newExpiry.Add(-time.Second),
)
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
true,
)
// Set the clock time to the new expiration, it
// should now be unlocked.
s.clock.(*clock.TestClock).SetTime(newExpiry)
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
false,
)
},
},
{
// Asserts that balances are reflected properly after
// locking confirmed and unconfirmed outputs.
name: "balance after locked outputs",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// We should see our full balance before locking
// any outputs.
assertBalance(
t, s, ns, false, block.Height, balance,
)
// Lock all of our outputs. Our balance should
// be 0.
lockID := LockID{1}
_ = lock(
t, s, ns, lockID, unconfirmedOutPoint, nil,
)
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
assertBalance(t, s, ns, false, block.Height, 0)
// Wait for the output locks to expire, causing
// our full balance to return .
s.clock.(*clock.TestClock).SetTime(expiry)
assertBalance(
t, s, ns, false, block.Height, balance,
)
},
},
{
// Asserts that the available utxos are reflected
// properly after locking confirmed and unconfirmed
// outputs.
name: "utxos after locked outputs",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// We should see all of our utxos before locking
// any.
assertUtxos(t, s, ns, []wire.OutPoint{
unconfirmedOutPoint,
confirmedOutPoint,
})
// Lock the unconfirmed utxo, we should now only
// see the confirmed.
lockID := LockID{1}
_ = lock(t, s, ns, lockID, unconfirmedOutPoint, nil)
assertUtxos(t, s, ns, []wire.OutPoint{
confirmedOutPoint,
})
// Now lock the confirmed utxo, we should no
// longer see any utxos available.
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
assertUtxos(t, s, ns, nil)
// Wait for the output locks to expire for the
// utxos to become available once again.
s.clock.(*clock.TestClock).SetTime(expiry)
assertUtxos(t, s, ns, []wire.OutPoint{
unconfirmedOutPoint,
confirmedOutPoint,
})
},
},
{
// Asserts that output locks are removed for outputs
// which have had a confirmed spend, ensuring the
// database doesn't store stale data.
name: "clear locked outputs after confirmed spend",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// Lock an output.
lockID := LockID{1}
lock(t, s, ns, lockID, confirmedOutPoint, nil)
// Create a spend and add it to the store as
// confirmed.
txHash := confirmedTx.TxHash()
spendTx := spendOutput(&txHash, 0, 500)
spendRec, err := NewTxRecordFromMsgTx(
spendTx, time.Now(),
)
if err != nil {
t.Fatal(err)
}
err = s.InsertTx(ns, spendRec, block)
if err != nil {
t.Fatal(err)
}
// The output should no longer be locked.
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
false,
)
},
},
{
// Assert that deleting expired locked outputs works as
// intended.
name: "delete expired locked outputs",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// Lock an output.
lockID := LockID{1}
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
// We should expect to find it if we iterate
// over the locked outputs bucket.
assertOutputLocksExist(t, s, ns, confirmedOutPoint)
// Delete all expired locked outputs. Since the
// lock hasn't expired yet, it should still
// exist.
err := s.DeleteExpiredLockedOutputs(ns)
if err != nil {
t.Fatalf("unable to delete expired "+
"locked outputs: %v", err)
}
assertOutputLocksExist(t, s, ns, confirmedOutPoint)
// Let the output lock expired.
s.clock.(*clock.TestClock).SetTime(expiry)
// Delete all expired locked outputs. We should
// no longer see any locked outputs.
err = s.DeleteExpiredLockedOutputs(ns)
if err != nil {
t.Fatalf("unable to delete expired "+
"locked outputs: %v", err)
}
assertOutputLocksExist(t, s, ns)
},
},
}
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
store, db, teardown, err := testStore()
if err != nil {
t.Fatal(err)
}
defer teardown()
// Replace the store's default clock with a mock one in
// order to simulate a real clock and speed up our
// tests.
store.clock = clock.NewTestClock(time.Time{})
// Add the spends we created above to the store.
insertConfirmedCredit(t, store, db, confirmedTx, 0, block)
insertUnconfirmedCredit(t, store, db, unconfirmedTx, 0)
// Run the test!
commitDBTx(t, store, db, func(ns walletdb.ReadWriteBucket) {
testCase.run(t, store, ns)
})
})
}
}