wtxmgr: introduce persistent output leases

This commit allows for the ability to lease an output to a particular ID
for a limited amount of time, ensuring that no other processes can use
said output for their coin selection needs. An output can either be
unlocked manually, or lazily whenever required.
This commit is contained in:
Wilmer Paulino 2020-05-13 16:49:14 -07:00
parent 43c9c2e2f7
commit c42130075c
No known key found for this signature in database
GPG key ID: 6DF57B9F9514972F
5 changed files with 792 additions and 14 deletions

View file

@ -69,6 +69,7 @@ var (
bucketUnmined = []byte("m") bucketUnmined = []byte("m")
bucketUnminedCredits = []byte("mc") bucketUnminedCredits = []byte("mc")
bucketUnminedInputs = []byte("mi") bucketUnminedInputs = []byte("mi")
bucketLockedOutputs = []byte("lo")
) )
// Root (namespace) bucket keys // Root (namespace) bucket keys
@ -116,10 +117,10 @@ func putMinedBalance(ns walletdb.ReadWriteBucket, amt btcutil.Amount) error {
// The canonical transaction hash serialization is simply the hash. // The canonical transaction hash serialization is simply the hash.
func canonicalOutPoint(txHash *chainhash.Hash, index uint32) []byte { func canonicalOutPoint(txHash *chainhash.Hash, index uint32) []byte {
k := make([]byte, 36) var k [36]byte
copy(k, txHash[:]) copy(k[:32], txHash[:])
byteOrder.PutUint32(k[32:36], index) byteOrder.PutUint32(k[32:36], index)
return k return k[:]
} }
func readCanonicalOutPoint(k []byte, op *wire.OutPoint) error { func readCanonicalOutPoint(k []byte, op *wire.OutPoint) error {
@ -1287,6 +1288,123 @@ func deleteRawUnminedInput(ns walletdb.ReadWriteBucket, outPointKey []byte,
return nil return nil
} }
// serializeLockedOutput serializes the value of a locked output.
func serializeLockedOutput(id LockID, expiry time.Time) []byte {
var v [len(id) + 8]byte
copy(v[:len(id)], id[:])
byteOrder.PutUint64(v[len(id):], uint64(expiry.Unix()))
return v[:]
}
// deserializeLockedOutput deserializes the value of a locked output.
func deserializeLockedOutput(v []byte) (LockID, time.Time) {
var id LockID
copy(id[:], v[:len(id)])
expiry := time.Unix(int64(byteOrder.Uint64(v[len(id):])), 0)
return id, expiry
}
// isLockedOutput determines whether an output is locked. If it is, its assigned
// ID is returned, along with its absolute expiration time. If the output lock
// exists, but its expiration has been met, then the output is considered
// unlocked.
func isLockedOutput(ns walletdb.ReadBucket, op wire.OutPoint,
timeNow time.Time) (LockID, time.Time, bool) {
// The bucket may not exist, indicating that no outputs have ever been
// locked, so we can just return now.
lockedOutputs := ns.NestedReadBucket(bucketLockedOutputs)
if lockedOutputs == nil {
return LockID{}, time.Time{}, false
}
// Retrieve the output lock, if any, and extract the relevant fields.
k := canonicalOutPoint(&op.Hash, op.Index)
v := lockedOutputs.Get(k)
if v == nil {
return LockID{}, time.Time{}, false
}
lockID, expiry := deserializeLockedOutput(v)
// If the output lock has already expired, delete it now.
if !timeNow.Before(expiry) {
return LockID{}, time.Time{}, false
}
return lockID, expiry, true
}
// lockOutput creates a lock for `duration` over an output assigned to the `id`,
// preventing it from becoming eligible for coin selection.
func lockOutput(ns walletdb.ReadWriteBucket, id LockID, op wire.OutPoint,
expiry time.Time) error {
// Create the corresponding bucket if necessary.
lockedOutputs, err := ns.CreateBucketIfNotExists(bucketLockedOutputs)
if err != nil {
str := "failed to create locked outputs bucket"
return storeError(ErrDatabase, str, err)
}
// Store a mapping of outpoint -> (id, expiry).
k := canonicalOutPoint(&op.Hash, op.Index)
v := serializeLockedOutput(id, expiry)
if err := lockedOutputs.Put(k, v[:]); err != nil {
str := fmt.Sprintf("%s: put failed for %v", bucketLockedOutputs,
op)
return storeError(ErrDatabase, str, err)
}
return nil
}
// unlockOutput removes a lock over an output, making it eligible for coin
// selection if still unspent.
func unlockOutput(ns walletdb.ReadWriteBucket, op wire.OutPoint) error {
// The bucket may not exist, indicating that no outputs have ever been
// locked, so we can just return now.
lockedOutputs := ns.NestedReadWriteBucket(bucketLockedOutputs)
if lockedOutputs == nil {
return nil
}
// Delete the key-value pair representing the output lock.
k := canonicalOutPoint(&op.Hash, op.Index)
if err := lockedOutputs.Delete(k); err != nil {
str := fmt.Sprintf("%s: delete failed for %v",
bucketLockedOutputs, op)
return storeError(ErrDatabase, str, err)
}
return nil
}
// forEachLockedOutput iterates over all existing locked outputs and invokes the
// callback `f` for each.
func forEachLockedOutput(ns walletdb.ReadBucket,
f func(wire.OutPoint, LockID, time.Time)) error {
// The bucket may not exist, indicating that no outputs have ever been
// locked, so we can just return now.
lockedOutputs := ns.NestedReadBucket(bucketLockedOutputs)
if lockedOutputs == nil {
return nil
}
return lockedOutputs.ForEach(func(k, v []byte) error {
var op wire.OutPoint
if err := readCanonicalOutPoint(k, &op); err != nil {
return err
}
lockID, expiry := deserializeLockedOutput(v)
f(op, lockID, expiry)
return nil
})
}
// openStore opens an existing transaction store from the passed namespace. // openStore opens an existing transaction store from the passed namespace.
func openStore(ns walletdb.ReadBucket) error { func openStore(ns walletdb.ReadBucket) error {
version, err := fetchVersion(ns) version, err := fetchVersion(ns)
@ -1382,6 +1500,10 @@ func createBuckets(ns walletdb.ReadWriteBucket) error {
str := "failed to create unmined inputs bucket" str := "failed to create unmined inputs bucket"
return storeError(ErrDatabase, str, err) return storeError(ErrDatabase, str, err)
} }
if _, err := ns.CreateBucket(bucketLockedOutputs); err != nil {
str := "failed to create locked outputs bucket"
return storeError(ErrDatabase, str, err)
}
return nil return nil
} }
@ -1421,6 +1543,10 @@ func deleteBuckets(ns walletdb.ReadWriteBucket) error {
str := "failed to delete unmined inputs bucket" str := "failed to delete unmined inputs bucket"
return storeError(ErrDatabase, str, err) return storeError(ErrDatabase, str, err)
} }
if err := ns.DeleteNestedBucket(bucketLockedOutputs); err != nil {
str := "failed to delete locked outputs bucket"
return storeError(ErrDatabase, str, err)
}
return nil return nil
} }

View file

@ -7,4 +7,6 @@ require (
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/btcsuite/btcwallet/walletdb v1.2.0 github.com/btcsuite/btcwallet/walletdb v1.2.0
github.com/lightningnetwork/lnd/clock v1.0.1
github.com/stretchr/testify v1.5.1 // indirect
) )

View file

@ -17,6 +17,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY= github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY=
github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@ -28,11 +29,18 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/lightningnetwork/lnd/clock v1.0.1 h1:QQod8+m3KgqHdvVMV+2DRNNZS1GRFir8mHZYA+Z2hFo=
github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q=
@ -56,3 +64,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -18,10 +18,16 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/clock"
) )
// TxLabelLimit is the length limit we impose on transaction labels. const (
const TxLabelLimit = 500 // TxLabelLimit is the length limit we impose on transaction labels.
TxLabelLimit = 500
// DefaultLockDuration is the default duration used to lock outputs.
DefaultLockDuration = 10 * time.Minute
)
var ( var (
// ErrEmptyLabel is returned when an attempt to write a label that is // ErrEmptyLabel is returned when an attempt to write a label that is
@ -40,6 +46,18 @@ var (
// ErrTxLabelNotFound is returned when no label is found for a // ErrTxLabelNotFound is returned when no label is found for a
// transaction hash. // transaction hash.
ErrTxLabelNotFound = errors.New("label for transaction not found") ErrTxLabelNotFound = errors.New("label for transaction not found")
// ErrUnknownOutput is an error returned when an output not known to the
// wallet is attempted to be locked.
ErrUnknownOutput = errors.New("unknown output")
// ErrOutputAlreadyLocked is an error returned when an output has
// already been locked to a different ID.
ErrOutputAlreadyLocked = errors.New("output already locked")
// ErrOutputUnlockNotAllowed is an error returned when an output unlock
// is attempted with a different ID than the one which locked it.
ErrOutputUnlockNotAllowed = errors.New("output unlock not alowed")
) )
// Block contains the minimum amount of data to uniquely identify any block on // Block contains the minimum amount of data to uniquely identify any block on
@ -155,25 +173,33 @@ type Credit struct {
FromCoinBase bool FromCoinBase bool
} }
// LockID represents a unique context-specific ID assigned to an output lock.
type LockID [32]byte
// Store implements a transaction store for storing and managing wallet // Store implements a transaction store for storing and managing wallet
// transactions. // transactions.
type Store struct { type Store struct {
chainParams *chaincfg.Params chainParams *chaincfg.Params
// clock is used to determine when outputs locks have expired.
clock clock.Clock
// Event callbacks. These execute in the same goroutine as the wtxmgr // Event callbacks. These execute in the same goroutine as the wtxmgr
// caller. // caller.
NotifyUnspent func(hash *chainhash.Hash, index uint32) NotifyUnspent func(hash *chainhash.Hash, index uint32)
} }
// Open opens the wallet transaction store from a walletdb namespace. If the // Open opens the wallet transaction store from a walletdb namespace. If the
// store does not exist, ErrNoExist is returned. // store does not exist, ErrNoExist is returned. `lockDuration` represents how
// long outputs are locked for.
func Open(ns walletdb.ReadBucket, chainParams *chaincfg.Params) (*Store, error) { func Open(ns walletdb.ReadBucket, chainParams *chaincfg.Params) (*Store, error) {
// Open the store. // Open the store.
err := openStore(ns) err := openStore(ns)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s := &Store{chainParams, nil} // TODO: set callbacks s := &Store{chainParams, clock.NewDefaultClock(), nil} // TODO: set callbacks
return s, nil return s, nil
} }
@ -401,7 +427,19 @@ func (s *Store) insertMinedTx(ns walletdb.ReadWriteBucket, rec *TxRecord,
// from the unconfirmed set. This also handles removing unconfirmed // from the unconfirmed set. This also handles removing unconfirmed
// transaction spend chains if any other unconfirmed transactions spend // transaction spend chains if any other unconfirmed transactions spend
// outputs of the removed double spend. // outputs of the removed double spend.
return s.removeDoubleSpends(ns, rec) if err := s.removeDoubleSpends(ns, rec); err != nil {
return err
}
// Clear any locked outputs since we now have a confirmed spend for
// them, making them not eligible for coin selection anyway.
for _, txIn := range rec.MsgTx.TxIn {
if err := unlockOutput(ns, txIn.PreviousOutPoint); err != nil {
return err
}
}
return nil
} }
// AddCredit marks a transaction record as containing a transaction output // AddCredit marks a transaction record as containing a transaction output
@ -740,11 +778,19 @@ func (s *Store) UnspentOutputs(ns walletdb.ReadBucket) ([]Credit, error) {
if err != nil { if err != nil {
return err return err
} }
// Skip the output if it's locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
return nil
}
if existsRawUnminedInput(ns, k) != nil { if existsRawUnminedInput(ns, k) != nil {
// Output is spent by an unmined transaction. // Output is spent by an unmined transaction.
// Skip this k/v pair. // Skip this k/v pair.
return nil return nil
} }
err = readUnspentBlock(v, &block) err = readUnspentBlock(v, &block)
if err != nil { if err != nil {
return err return err
@ -786,17 +832,22 @@ func (s *Store) UnspentOutputs(ns walletdb.ReadBucket) ([]Credit, error) {
} }
err = ns.NestedReadBucket(bucketUnminedCredits).ForEach(func(k, v []byte) error { err = ns.NestedReadBucket(bucketUnminedCredits).ForEach(func(k, v []byte) error {
if err := readCanonicalOutPoint(k, &op); err != nil {
return err
}
// Skip the output if it's locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
return nil
}
if existsRawUnminedInput(ns, k) != nil { if existsRawUnminedInput(ns, k) != nil {
// Output is spent by an unmined transaction. // Output is spent by an unmined transaction.
// Skip to next unmined credit. // Skip to next unmined credit.
return nil return nil
} }
err := readCanonicalOutPoint(k, &op)
if err != nil {
return err
}
// TODO(jrick): Reading/parsing the entire transaction record // TODO(jrick): Reading/parsing the entire transaction record
// just for the output amount and script can be avoided. // just for the output amount and script can be avoided.
recVal := existsRawUnmined(ns, op.Hash[:]) recVal := existsRawUnmined(ns, op.Hash[:])
@ -858,6 +909,22 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
if err != nil { if err != nil {
return err return err
} }
// Subtract the output's amount if it's locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
_, v := existsCredit(ns, &op.Hash, op.Index, &block)
amt, err := fetchRawCreditAmount(v)
if err != nil {
return err
}
bal -= amt
// To prevent decrementing the balance twice if the
// output has an unconfirmed spend, return now.
return nil
}
if existsRawUnminedInput(ns, k) != nil { if existsRawUnminedInput(ns, k) != nil {
_, v := existsCredit(ns, &op.Hash, op.Index, &block) _, v := existsCredit(ns, &op.Hash, op.Index, &block)
amt, err := fetchRawCreditAmount(v) amt, err := fetchRawCreditAmount(v)
@ -866,6 +933,7 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
} }
bal -= amt bal -= amt
} }
return nil return nil
}) })
if err != nil { if err != nil {
@ -902,7 +970,14 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
for i := uint32(0); i < numOuts; i++ { for i := uint32(0); i < numOuts; i++ {
// Avoid double decrementing the credit amount // Avoid double decrementing the credit amount
// if it was already removed for being spent by // if it was already removed for being spent by
// an unmined tx. // an unmined tx or being locked.
op = wire.OutPoint{Hash: *txHash, Index: i}
_, _, isLocked := isLockedOutput(
ns, op, s.clock.Now(),
)
if isLocked {
continue
}
opKey := canonicalOutPoint(txHash, i) opKey := canonicalOutPoint(txHash, i)
if existsRawUnminedInput(ns, opKey) != nil { if existsRawUnminedInput(ns, opKey) != nil {
continue continue
@ -935,6 +1010,17 @@ func (s *Store) Balance(ns walletdb.ReadBucket, minConf int32, syncHeight int32)
// output that is unspent. // output that is unspent.
if minConf == 0 { if minConf == 0 {
err = ns.NestedReadBucket(bucketUnminedCredits).ForEach(func(k, v []byte) error { err = ns.NestedReadBucket(bucketUnminedCredits).ForEach(func(k, v []byte) error {
if err := readCanonicalOutPoint(k, &op); err != nil {
return err
}
// Skip adding the balance for this output if it's
// locked.
_, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked {
return nil
}
if existsRawUnminedInput(ns, k) != nil { if existsRawUnminedInput(ns, k) != nil {
// Output is spent by an unmined transaction. // Output is spent by an unmined transaction.
// Skip to next unmined credit. // Skip to next unmined credit.
@ -1041,3 +1127,102 @@ func DeserializeLabel(v []byte) (string, error) {
label := string(v[2:]) label := string(v[2:])
return label, nil return label, nil
} }
// isKnownOutput returns whether the output is known to the transaction store
// either as confirmed or unconfirmed.
func isKnownOutput(ns walletdb.ReadWriteBucket, op wire.OutPoint) bool {
k := canonicalOutPoint(&op.Hash, op.Index)
if existsRawUnminedCredit(ns, k) != nil {
return true
}
if existsRawUnspent(ns, k) != nil {
return true
}
return false
}
// LockOutput locks an output to the given ID, preventing it from being
// available for coin selection. The absolute time of the lock's expiration is
// returned. The expiration of the lock can be extended by successive
// invocations of this call.
//
// Outputs can be unlocked before their expiration through `UnlockOutput`.
// Otherwise, they are unlocked lazily through calls which iterate through all
// known outputs, e.g., `Balance`, `UnspentOutputs`.
//
// If the output is not known, ErrUnknownOutput is returned. If the output has
// already been locked to a different ID, then ErrOutputAlreadyLocked is
// returned.
func (s *Store) LockOutput(ns walletdb.ReadWriteBucket, id LockID,
op wire.OutPoint) (time.Time, error) {
// Make sure the output is known.
if !isKnownOutput(ns, op) {
return time.Time{}, ErrUnknownOutput
}
// Make sure the output hasn't already been locked to some other ID.
lockedID, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if isLocked && lockedID != id {
return time.Time{}, ErrOutputAlreadyLocked
}
expiry := s.clock.Now().Add(DefaultLockDuration)
if err := lockOutput(ns, id, op, expiry); err != nil {
return time.Time{}, err
}
return expiry, nil
}
// UnlockOutput unlocks an output, allowing it to be available for coin
// selection if it remains unspent. The ID should match the one used to
// originally lock the output.
func (s *Store) UnlockOutput(ns walletdb.ReadWriteBucket, id LockID,
op wire.OutPoint) error {
// Make sure the output is known.
if !isKnownOutput(ns, op) {
return ErrUnknownOutput
}
// If the output has already been unlocked, we can return now.
lockedID, _, isLocked := isLockedOutput(ns, op, s.clock.Now())
if !isLocked {
return nil
}
// Make sure the output was locked to the same ID.
if lockedID != id {
return ErrOutputUnlockNotAllowed
}
return unlockOutput(ns, op)
}
// DeleteExpiredLockedOutputs iterates through all existing locked outputs and
// deletes those which have already expired.
func (s *Store) DeleteExpiredLockedOutputs(ns walletdb.ReadWriteBucket) error {
// Collect all expired output locks first to remove them later on. This
// is necessary as deleting while iterating would invalidate the
// iterator.
var expiredOutputs []wire.OutPoint
err := forEachLockedOutput(
ns, func(op wire.OutPoint, _ LockID, expiration time.Time) {
if !s.clock.Now().Before(expiration) {
expiredOutputs = append(expiredOutputs, op)
}
},
)
if err != nil {
return err
}
for _, op := range expiredOutputs {
if err := unlockOutput(ns, op); err != nil {
return err
}
}
return nil
}

View file

@ -19,6 +19,7 @@ import (
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
_ "github.com/btcsuite/btcwallet/walletdb/bdb" _ "github.com/btcsuite/btcwallet/walletdb/bdb"
"github.com/lightningnetwork/lnd/clock"
) )
// Received transaction output for mainnet outpoint // Received transaction output for mainnet outpoint
@ -2355,3 +2356,457 @@ func TestTxLabel(t *testing.T) {
t.Fatalf("expected: %v, got: %v", ErrTxLabelNotFound, err) t.Fatalf("expected: %v, got: %v", ErrTxLabelNotFound, err)
} }
} }
func assertBalance(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
confirmed bool, blockHeight int32, exp btcutil.Amount) {
t.Helper()
minConf := int32(0)
if confirmed {
minConf = 1
}
balance, err := s.Balance(ns, minConf, blockHeight)
if err != nil {
t.Fatal(err)
}
if balance != exp {
t.Fatalf("expected balance %v, got %v", exp, balance)
}
}
func assertUtxos(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
exp []wire.OutPoint) {
t.Helper()
utxos, err := s.UnspentOutputs(ns)
if err != nil {
t.Fatal(err)
}
for _, expUtxo := range exp {
found := false
for _, utxo := range utxos {
if expUtxo == utxo.OutPoint {
found = true
break
}
}
if !found {
t.Fatalf("expected utxo %v", expUtxo)
}
}
}
func assertLocked(t *testing.T, ns walletdb.ReadWriteBucket, op wire.OutPoint,
timeNow time.Time, exp bool) {
t.Helper()
_, _, locked := isLockedOutput(ns, op, timeNow)
if locked && locked != exp {
t.Fatalf("expected locked output %v", op)
}
if !locked && locked != exp {
t.Fatalf("unexpected locked output %v", op)
}
}
func assertOutputLocksExist(t *testing.T, s *Store, ns walletdb.ReadBucket,
exp ...wire.OutPoint) {
t.Helper()
var found []wire.OutPoint
forEachLockedOutput(ns, func(op wire.OutPoint, _ LockID, _ time.Time) {
found = append(found, op)
})
if len(found) != len(exp) {
t.Fatalf("expected to find %v locked output(s), found %v",
len(exp), len(found))
}
for _, expOp := range exp {
exists := false
for _, foundOp := range found {
if expOp == foundOp {
exists = true
break
}
}
if !exists {
t.Fatalf("expected output lock for %v to exist", expOp)
}
}
}
func lock(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
id LockID, op wire.OutPoint, exp error) time.Time {
t.Helper()
expiry, err := s.LockOutput(ns, id, op)
if err != exp {
t.Fatalf("expected err %q, got %q", exp, err)
}
if exp != nil && exp != ErrOutputAlreadyLocked {
assertLocked(t, ns, op, s.clock.Now(), false)
} else {
assertLocked(t, ns, op, s.clock.Now(), true)
}
return expiry
}
func unlock(t *testing.T, s *Store, ns walletdb.ReadWriteBucket,
id LockID, op wire.OutPoint, exp error) {
t.Helper()
if err := s.UnlockOutput(ns, id, op); err != exp {
t.Fatalf("expected err %q, got %q", exp, err)
}
if exp != nil {
assertLocked(t, ns, op, s.clock.Now(), true)
} else {
assertLocked(t, ns, op, s.clock.Now(), false)
}
}
func insertUnconfirmedCredit(t *testing.T, store *Store, db walletdb.DB,
tx *wire.MsgTx, idx uint32) {
t.Helper()
insertConfirmedCredit(t, store, db, tx, idx, nil)
}
func insertConfirmedCredit(t *testing.T, store *Store, db walletdb.DB,
tx *wire.MsgTx, idx uint32, block *BlockMeta) {
t.Helper()
commitDBTx(t, store, db, func(ns walletdb.ReadWriteBucket) {
rec, err := NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
t.Fatal(err)
}
if err := store.InsertTx(ns, rec, block); err != nil {
t.Fatal(err)
}
if err := store.AddCredit(ns, rec, block, idx, false); err != nil {
t.Fatal(err)
}
})
}
// TestOutputLocks aims to test all cases revolving output locks, ensuring they
// are and aren't eligible for coin selection after certain operations.
func TestOutputLocks(t *testing.T) {
t.Parallel()
// Define a series of constants we'll use throughout our tests.
block := &BlockMeta{
Block: Block{
Hash: chainhash.Hash{1, 3, 3, 7},
Height: 1337,
},
Time: time.Now(),
}
// Create a coinbase transaction with two outputs, which we'll spend.
coinbase := newCoinBase(
btcutil.SatoshiPerBitcoin, btcutil.SatoshiPerBitcoin*2,
)
coinbaseHash := coinbase.TxHash()
// One of the spends will be unconfirmed.
const unconfirmedBalance = btcutil.SatoshiPerBitcoin / 2
unconfirmedTx := spendOutput(&coinbaseHash, 0, unconfirmedBalance)
unconfirmedOutPoint := wire.OutPoint{
Hash: unconfirmedTx.TxHash(),
Index: 0,
}
// The other will be confirmed.
const confirmedBalance = btcutil.SatoshiPerBitcoin
confirmedTx := spendOutput(&coinbaseHash, 1, confirmedBalance)
confirmedOutPoint := wire.OutPoint{
Hash: confirmedTx.TxHash(),
Index: 0,
}
const balance = unconfirmedBalance + confirmedBalance
testCases := []struct {
name string
run func(*testing.T, *Store, walletdb.ReadWriteBucket)
}{
{
// Asserts that we cannot lock unknown outputs to the
// store.
name: "unknown output",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
lockID := LockID{1}
op := wire.OutPoint{Index: 1}
_ = lock(t, s, ns, lockID, op, ErrUnknownOutput)
},
},
{
// Asserts that we cannot lock outputs that have already
// been locked to someone else.
name: "already locked output",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
lockID1 := LockID{1}
lockID2 := LockID{2}
_ = lock(
t, s, ns, lockID1, unconfirmedOutPoint,
nil,
)
_ = lock(
t, s, ns, lockID2, unconfirmedOutPoint,
ErrOutputAlreadyLocked,
)
_ = lock(
t, s, ns, lockID1, confirmedOutPoint,
nil,
)
_ = lock(
t, s, ns, lockID2, confirmedOutPoint,
ErrOutputAlreadyLocked,
)
},
},
{
// Asserts that only the ID which locked at output can
// manually unlock it.
name: "unlock output",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
lockID1 := LockID{1}
lockID2 := LockID{2}
_ = lock(t, s, ns, lockID1, confirmedOutPoint, nil)
unlock(
t, s, ns, lockID2, confirmedOutPoint,
ErrOutputUnlockNotAllowed,
)
unlock(t, s, ns, lockID1, confirmedOutPoint, nil)
},
},
{
// Asserts that locking an output that's already locked
// with the correct ID results in an extension of the
// lock.
name: "extend locked output lease",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// Lock the output and set the clock time a
// minute before the expiration. It should
// remain locked.
lockID := LockID{1}
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
s.clock.(*clock.TestClock).SetTime(
expiry.Add(-time.Minute),
)
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
true,
)
// Lock it once again, extending its expiration,
// and set the clock time a second before the
// expiration. It should remain locked.
s.clock.(*clock.TestClock).SetTime(
expiry.Add(-time.Minute),
)
newExpiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
if !newExpiry.After(expiry) {
t.Fatal("expected output lock " +
"duration to be renewed")
}
s.clock.(*clock.TestClock).SetTime(
newExpiry.Add(-time.Second),
)
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
true,
)
// Set the clock time to the new expiration, it
// should now be unlocked.
s.clock.(*clock.TestClock).SetTime(newExpiry)
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
false,
)
},
},
{
// Asserts that balances are reflected properly after
// locking confirmed and unconfirmed outputs.
name: "balance after locked outputs",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// We should see our full balance before locking
// any outputs.
assertBalance(
t, s, ns, false, block.Height, balance,
)
// Lock all of our outputs. Our balance should
// be 0.
lockID := LockID{1}
_ = lock(
t, s, ns, lockID, unconfirmedOutPoint, nil,
)
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
assertBalance(t, s, ns, false, block.Height, 0)
// Wait for the output locks to expire, causing
// our full balance to return .
s.clock.(*clock.TestClock).SetTime(expiry)
assertBalance(
t, s, ns, false, block.Height, balance,
)
},
},
{
// Asserts that the available utxos are reflected
// properly after locking confirmed and unconfirmed
// outputs.
name: "utxos after locked outputs",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// We should see all of our utxos before locking
// any.
assertUtxos(t, s, ns, []wire.OutPoint{
unconfirmedOutPoint,
confirmedOutPoint,
})
// Lock the unconfirmed utxo, we should now only
// see the confirmed.
lockID := LockID{1}
_ = lock(t, s, ns, lockID, unconfirmedOutPoint, nil)
assertUtxos(t, s, ns, []wire.OutPoint{
confirmedOutPoint,
})
// Now lock the confirmed utxo, we should no
// longer see any utxos available.
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
assertUtxos(t, s, ns, nil)
// Wait for the output locks to expire for the
// utxos to become available once again.
s.clock.(*clock.TestClock).SetTime(expiry)
assertUtxos(t, s, ns, []wire.OutPoint{
unconfirmedOutPoint,
confirmedOutPoint,
})
},
},
{
// Asserts that output locks are removed for outputs
// which have had a confirmed spend, ensuring the
// database doesn't store stale data.
name: "clear locked outputs after confirmed spend",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// Lock an output.
lockID := LockID{1}
lock(t, s, ns, lockID, confirmedOutPoint, nil)
// Create a spend and add it to the store as
// confirmed.
txHash := confirmedTx.TxHash()
spendTx := spendOutput(&txHash, 0, 500)
spendRec, err := NewTxRecordFromMsgTx(
spendTx, time.Now(),
)
if err != nil {
t.Fatal(err)
}
err = s.InsertTx(ns, spendRec, block)
if err != nil {
t.Fatal(err)
}
// The output should no longer be locked.
assertLocked(
t, ns, confirmedOutPoint, s.clock.Now(),
false,
)
},
},
{
// Assert that deleting expired locked outputs works as
// intended.
name: "delete expired locked outputs",
run: func(t *testing.T, s *Store, ns walletdb.ReadWriteBucket) {
// Lock an output.
lockID := LockID{1}
expiry := lock(
t, s, ns, lockID, confirmedOutPoint, nil,
)
// We should expect to find it if we iterate
// over the locked outputs bucket.
assertOutputLocksExist(t, s, ns, confirmedOutPoint)
// Delete all expired locked outputs. Since the
// lock hasn't expired yet, it should still
// exist.
err := s.DeleteExpiredLockedOutputs(ns)
if err != nil {
t.Fatalf("unable to delete expired "+
"locked outputs: %v", err)
}
assertOutputLocksExist(t, s, ns, confirmedOutPoint)
// Let the output lock expired.
s.clock.(*clock.TestClock).SetTime(expiry)
// Delete all expired locked outputs. We should
// no longer see any locked outputs.
err = s.DeleteExpiredLockedOutputs(ns)
if err != nil {
t.Fatalf("unable to delete expired "+
"locked outputs: %v", err)
}
assertOutputLocksExist(t, s, ns)
},
},
}
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
store, db, teardown, err := testStore()
if err != nil {
t.Fatal(err)
}
defer teardown()
// Replace the store's default clock with a mock one in
// order to simulate a real clock and speed up our
// tests.
store.clock = clock.NewTestClock(time.Time{})
// Add the spends we created above to the store.
insertConfirmedCredit(t, store, db, confirmedTx, 0, block)
insertUnconfirmedCredit(t, store, db, unconfirmedTx, 0)
// Run the test!
commitDBTx(t, store, db, func(ns walletdb.ReadWriteBucket) {
testCase.run(t, store, ns)
})
})
}
}