Merge pull request #757 from guggero/wallet-db-tx

wallet+walletdb: remove manual DB transactions, use custom `View/Update` implementation when provided
This commit is contained in:
Olaoluwa Osuntokun 2021-08-02 17:40:36 -07:00 committed by GitHub
commit eebed51155
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 333 additions and 285 deletions

View file

@ -95,7 +95,7 @@ func (s secretSource) GetScript(addr btcutil.Address) ([]byte, error) {
}
// txToOutputs creates a signed transaction which includes each output from
// outputs. Previous outputs to reedeem are chosen from the passed account's
// outputs. Previous outputs to redeem are chosen from the passed account's
// UTXO set and minconf policy. An additional output may be added to return
// change to the wallet. This output will have an address generated from the
// given key scope and account. If a key scope is not specified, the address
@ -109,148 +109,159 @@ func (s secretSource) GetScript(addr btcutil.Address) ([]byte, error) {
func (w *Wallet) txToOutputs(outputs []*wire.TxOut, keyScope *waddrmgr.KeyScope,
account uint32, minconf int32, feeSatPerKb btcutil.Amount,
coinSelectionStrategy CoinSelectionStrategy, dryRun bool) (
tx *txauthor.AuthoredTx, err error) {
*txauthor.AuthoredTx, error) {
chainClient, err := w.requireChainClient()
if err != nil {
return nil, err
}
dbtx, err := w.db.BeginReadWriteTx()
if err != nil {
return nil, err
}
defer func() { _ = dbtx.Rollback() }()
addrmgrNs, changeSource, err := w.addrMgrWithChangeSource(
dbtx, keyScope, account,
)
if err != nil {
return nil, err
}
// Get current block's height and hash.
bs, err := chainClient.BlockStamp()
if err != nil {
return nil, err
}
eligible, err := w.findEligibleOutputs(
dbtx, keyScope, account, minconf, bs,
)
if err != nil {
return nil, err
}
var tx *txauthor.AuthoredTx
err = walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error {
addrmgrNs, changeSource, err := w.addrMgrWithChangeSource(
dbtx, keyScope, account,
)
if err != nil {
return err
}
var inputSource txauthor.InputSource
eligible, err := w.findEligibleOutputs(
dbtx, keyScope, account, minconf, bs,
)
if err != nil {
return err
}
switch coinSelectionStrategy {
// Pick largest outputs first.
case CoinSelectionLargest:
sort.Sort(sort.Reverse(byAmount(eligible)))
inputSource = makeInputSource(eligible)
var inputSource txauthor.InputSource
// Select coins at random. This prevents the creation of ever smaller
// utxos over time that may never become economical to spend.
case CoinSelectionRandom:
// Skip inputs that do not raise the total transaction output
// value at the requested fee rate.
var positivelyYielding []wtxmgr.Credit
for _, output := range eligible {
output := output
switch coinSelectionStrategy {
// Pick largest outputs first.
case CoinSelectionLargest:
sort.Sort(sort.Reverse(byAmount(eligible)))
inputSource = makeInputSource(eligible)
if !inputYieldsPositively(&output, feeSatPerKb) {
continue
// Select coins at random. This prevents the creation of ever
// smaller utxos over time that may never become economical to
// spend.
case CoinSelectionRandom:
// Skip inputs that do not raise the total transaction
// output value at the requested fee rate.
var positivelyYielding []wtxmgr.Credit
for _, output := range eligible {
output := output
if !inputYieldsPositively(&output, feeSatPerKb) {
continue
}
positivelyYielding = append(
positivelyYielding, output,
)
}
positivelyYielding = append(positivelyYielding, output)
rand.Shuffle(len(positivelyYielding), func(i, j int) {
positivelyYielding[i], positivelyYielding[j] =
positivelyYielding[j], positivelyYielding[i]
})
inputSource = makeInputSource(positivelyYielding)
}
rand.Shuffle(len(positivelyYielding), func(i, j int) {
positivelyYielding[i], positivelyYielding[j] =
positivelyYielding[j], positivelyYielding[i]
})
inputSource = makeInputSource(positivelyYielding)
}
tx, err = txauthor.NewUnsignedTransaction(
outputs, feeSatPerKb, inputSource, changeSource,
)
if err != nil {
return nil, err
}
// Randomize change position, if change exists, before signing. This
// doesn't affect the serialize size, so the change amount will still
// be valid.
if tx.ChangeIndex >= 0 {
tx.RandomizeChangePosition()
}
// If a dry run was requested, we return now before adding the input
// scripts, and don't commit the database transaction. The DB will be
// rolled back when this method returns to ensure the dry run didn't
// alter the DB in any way.
if dryRun {
return tx, nil
}
// Before committing the transaction, we'll sign our inputs. If the
// inputs are part of a watch-only account, there's no private key
// information stored, so we'll skip signing such.
var watchOnly bool
if keyScope == nil {
// If a key scope wasn't specified, then coin selection was
// performed from the default wallet accounts (NP2WKH, P2WKH),
// so any key scope provided doesn't impact the result of this
// call.
watchOnly, err = w.Manager.IsWatchOnlyAccount(
addrmgrNs, waddrmgr.KeyScopeBIP0084, account,
)
} else {
watchOnly, err = w.Manager.IsWatchOnlyAccount(
addrmgrNs, *keyScope, account,
)
}
if err != nil {
return nil, err
}
if !watchOnly {
err = tx.AddAllInputScripts(secretSource{w.Manager, addrmgrNs})
if err != nil {
return nil, err
}
err = validateMsgTx(tx.Tx, tx.PrevScripts, tx.PrevInputValues)
if err != nil {
return nil, err
}
}
if err := dbtx.Commit(); err != nil {
return nil, err
}
if tx.ChangeIndex >= 0 && account == waddrmgr.ImportedAddrAccount {
changeAmount := btcutil.Amount(tx.Tx.TxOut[tx.ChangeIndex].Value)
log.Warnf("Spend from imported account produced change: moving"+
" %v from imported account into default account.", changeAmount)
}
// Finally, we'll request the backend to notify us of the transaction
// that pays to the change address, if there is one, when it confirms.
if tx.ChangeIndex >= 0 {
changePkScript := tx.Tx.TxOut[tx.ChangeIndex].PkScript
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
changePkScript, w.chainParams,
tx, err = txauthor.NewUnsignedTransaction(
outputs, feeSatPerKb, inputSource, changeSource,
)
if err != nil {
return nil, err
return err
}
if err := chainClient.NotifyReceived(addrs); err != nil {
return nil, err
// Randomize change position, if change exists, before signing.
// This doesn't affect the serialize size, so the change amount
// will still be valid.
if tx.ChangeIndex >= 0 {
tx.RandomizeChangePosition()
}
// If a dry run was requested, we return now before adding the
// input scripts, and don't commit the database transaction.
// By returning an error, we make sure the walletdb.Update call
// rolls back the transaction. But we'll react to this specific
// error outside of the DB transaction so we can still return
// the produced chain TX.
if dryRun {
return walletdb.ErrDryRunRollBack
}
// Before committing the transaction, we'll sign our inputs. If
// the inputs are part of a watch-only account, there's no
// private key information stored, so we'll skip signing such.
var watchOnly bool
if keyScope == nil {
// If a key scope wasn't specified, then coin selection
// was performed from the default wallet accounts
// (NP2WKH, P2WKH), so any key scope provided doesn't
// impact the result of this call.
watchOnly, err = w.Manager.IsWatchOnlyAccount(
addrmgrNs, waddrmgr.KeyScopeBIP0084, account,
)
} else {
watchOnly, err = w.Manager.IsWatchOnlyAccount(
addrmgrNs, *keyScope, account,
)
}
if err != nil {
return err
}
if !watchOnly {
err = tx.AddAllInputScripts(
secretSource{w.Manager, addrmgrNs},
)
if err != nil {
return err
}
err = validateMsgTx(
tx.Tx, tx.PrevScripts, tx.PrevInputValues,
)
if err != nil {
return err
}
}
if tx.ChangeIndex >= 0 && account == waddrmgr.ImportedAddrAccount {
changeAmount := btcutil.Amount(
tx.Tx.TxOut[tx.ChangeIndex].Value,
)
log.Warnf("Spend from imported account produced "+
"change: moving %v from imported account into "+
"default account.", changeAmount)
}
// Finally, we'll request the backend to notify us of the
// transaction that pays to the change address, if there is one,
// when it confirms.
if tx.ChangeIndex >= 0 {
changePkScript := tx.Tx.TxOut[tx.ChangeIndex].PkScript
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
changePkScript, w.chainParams,
)
if err != nil {
return err
}
if err := chainClient.NotifyReceived(addrs); err != nil {
return err
}
}
return nil
})
if err != nil && err != walletdb.ErrDryRunRollBack {
return nil, err
}
return tx, nil

View file

@ -245,57 +245,70 @@ func (w *Wallet) ImportAccountDryRun(name string,
*waddrmgr.AccountProperties, []waddrmgr.ManagedAddress,
[]waddrmgr.ManagedAddress, error) {
var (
accountProps *waddrmgr.AccountProperties
externalAddrs []waddrmgr.ManagedAddress
internalAddrs []waddrmgr.ManagedAddress
)
// Start a database transaction that we'll never commit and always
// rollback.
tx, err := w.db.BeginReadWriteTx()
if err != nil {
return nil, nil, nil, err
}
defer func() {
_ = tx.Rollback()
}()
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
// rollback because we'll return a specific error in the end.
err := walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
// Import the account as usual.
accountProps, err := w.importAccount(
ns, name, accountPubKey, masterKeyFingerprint, addrType,
)
if err != nil {
return nil, nil, nil, err
}
// Import the account as usual.
var err error
accountProps, err = w.importAccount(
ns, name, accountPubKey, masterKeyFingerprint, addrType,
)
if err != nil {
return err
}
// Derive the external and internal addresses. Note that we could do
// this based on the provided accountPubKey alone, but we go through the
// ScopedKeyManager instead to ensure addresses will be derived as
// expected from the wallet's point-of-view.
manager, err := w.Manager.FetchScopedKeyManager(accountProps.KeyScope)
if err != nil {
return nil, nil, nil, err
}
// Derive the external and internal addresses. Note that we
// could do this based on the provided accountPubKey alone, but
// we go through the ScopedKeyManager instead to ensure
// addresses will be derived as expected from the wallet's
// point-of-view.
manager, err := w.Manager.FetchScopedKeyManager(
accountProps.KeyScope,
)
if err != nil {
return err
}
// The importAccount method above will cache the imported account within
// the scoped manager. Since this is a dry-run attempt, we'll want to
// invalidate the cache for it.
defer manager.InvalidateAccountCache(accountProps.AccountNumber)
// The importAccount method above will cache the imported
// account within the scoped manager. Since this is a dry-run
// attempt, we'll want to invalidate the cache for it.
defer manager.InvalidateAccountCache(accountProps.AccountNumber)
externalAddrs, err := manager.NextExternalAddresses(
ns, accountProps.AccountNumber, numAddrs,
)
if err != nil {
return nil, nil, nil, err
}
internalAddrs, err := manager.NextInternalAddresses(
ns, accountProps.AccountNumber, numAddrs,
)
if err != nil {
return nil, nil, nil, err
}
externalAddrs, err = manager.NextExternalAddresses(
ns, accountProps.AccountNumber, numAddrs,
)
if err != nil {
return err
}
internalAddrs, err = manager.NextInternalAddresses(
ns, accountProps.AccountNumber, numAddrs,
)
if err != nil {
return err
}
// Refresh the account's properties after generating the addresses.
accountProps, err = manager.AccountProperties(
ns, accountProps.AccountNumber,
)
if err != nil {
// Refresh the account's properties after generating the
// addresses.
accountProps, err = manager.AccountProperties(
ns, accountProps.AccountNumber,
)
if err != nil {
return err
}
// Make sure we always roll back the dry-run transaction by
// returning an error here.
return walletdb.ErrDryRunRollBack
})
if err != nil && err != walletdb.ErrDryRunRollBack {
return nil, nil, nil, err
}

View file

@ -186,33 +186,28 @@ func (w *Wallet) FundPsbt(packet *psbt.Packet, keyScope *waddrmgr.KeyScope,
inputSource := constantInputSource(credits)
// We also need a change source which needs to be able to insert
// a new change addresse into the database.
dbtx, err := w.db.BeginReadWriteTx()
if err != nil {
return 0, err
}
_, changeSource, err := w.addrMgrWithChangeSource(
dbtx, keyScope, account,
)
if err != nil {
return 0, err
}
// a new change address into the database.
err = walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error {
_, changeSource, err := w.addrMgrWithChangeSource(
dbtx, keyScope, account,
)
if err != nil {
return err
}
// Ask the txauthor to create a transaction with our selected
// coins. This will perform fee estimation and add a change
// output if necessary.
tx, err = txauthor.NewUnsignedTransaction(
txOut, feeSatPerKB, inputSource, changeSource,
)
if err != nil {
_ = dbtx.Rollback()
return 0, fmt.Errorf("fee estimation not successful: "+
"%v", err)
}
// Ask the txauthor to create a transaction with our
// selected coins. This will perform fee estimation and
// add a change output if necessary.
tx, err = txauthor.NewUnsignedTransaction(
txOut, feeSatPerKB, inputSource, changeSource,
)
if err != nil {
return fmt.Errorf("fee estimation not "+
"successful: %v", err)
}
// The transaction could be created, let's commit the DB TX to
// store the change address (if one was created).
err = dbtx.Commit()
return nil
})
if err != nil {
return 0, fmt.Errorf("could not add change address to "+
"database: %v", err)

View file

@ -365,6 +365,83 @@ func (db *db) Batch(f func(tx walletdb.ReadWriteTx) error) error {
})
}
// View opens a database read transaction and executes the function f with the
// transaction passed as a parameter. After f exits, the transaction is rolled
// back. If f errors, its error is returned, not a rollback error (if any
// occur). The passed reset function is called before the start of the
// transaction and can be used to reset intermediate state. As callers may
// expect retries of the f closure (depending on the database backend used), the
// reset function will be called before each retry respectively.
func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error {
// We don't do any retries with bolt so we just initially call the reset
// function once.
reset()
tx, err := db.BeginReadTx()
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if tx != nil {
_ = tx.Rollback()
}
}()
err = f(tx)
rollbackErr := tx.Rollback()
if err != nil {
return err
}
if rollbackErr != nil {
return rollbackErr
}
return nil
}
// Update opens a database read/write transaction and executes the function f
// with the transaction passed as a parameter. After f exits, if f did not
// error, the transaction is committed. Otherwise, if f did error, the
// transaction is rolled back. If the rollback fails, the original error
// returned by f is still returned. If the commit fails, the commit error is
// returned. As callers may expect retries of the f closure (depending on the
// database backend used), the reset function will be called before each retry
// respectively.
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error {
// We don't do any retries with bolt so we just initially call the reset
// function once.
reset()
tx, err := db.BeginReadWriteTx()
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if tx != nil {
_ = tx.Rollback()
}
}()
err = f(tx)
if err != nil {
// Want to return the original error, not a rollback error if
// any occur.
_ = tx.Rollback()
return err
}
return tx.Commit()
}
// PrintStats returns all collected stats pretty printed into a string.
func (db *db) PrintStats() string {
return "<no stats are collected by bdb backend>"
}
// filesExists reports whether the named file or directory exists.
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {

View file

@ -39,6 +39,10 @@ var (
// ErrInvalid is returned if the specified database is not valid.
ErrInvalid = errors.New("invalid database")
// ErrDryRunRollBack is returned if a database transaction should be
// rolled back because its changes were a dry-run only.
ErrDryRunRollBack = errors.New("dry run only; should roll back")
)
// Errors that can occur when beginning or committing a transaction.

View file

@ -126,8 +126,9 @@ type ReadWriteBucket interface {
// ErrTxNotWritable if attempted against a read-only transaction.
Delete(key []byte) error
// Cursor returns a new cursor, allowing for iteration over the bucket's
// key/value pairs and nested buckets in forward or backward order.
// ReadWriteCursor returns a new cursor, allowing for iteration over the
// bucket's key/value pairs and nested buckets in forward or backward
// order.
ReadWriteCursor() ReadWriteCursor
// Tx returns the bucket's transaction.
@ -190,7 +191,7 @@ func BucketIsEmpty(bucket ReadBucket) bool {
return k == nil && v == nil
}
// DB represents an ACID database. All database access is performed through
// DB represents an ACID database. All database access is performed through
// read or read+write transactions.
type DB interface {
// BeginReadTx opens a database read transaction.
@ -205,16 +206,45 @@ type DB interface {
// Close cleanly shuts down the database and syncs all data.
Close() error
// PrintStats returns all collected stats pretty printed into a string.
PrintStats() string
// View opens a database read transaction and executes the function f
// with the transaction passed as a parameter. After f exits, the
// transaction is rolled back. If f errors, its error is returned, not a
// rollback error (if any occur). The passed reset function is called
// before the start of the transaction and can be used to reset
// intermediate state. As callers may expect retries of the f closure
// (depending on the database backend used), the reset function will be
// called before each retry respectively.
//
// NOTE: For new code, this method should be used directly instead of
// the package level View() function.
View(f func(tx ReadTx) error, reset func()) error
// Update opens a database read/write transaction and executes the
// function f with the transaction passed as a parameter. After f exits,
// if f did not error, the transaction is committed. Otherwise, if f did
// error, the transaction is rolled back. If the rollback fails, the
// original error returned by f is still returned. If the commit fails,
// the commit error is returned. As callers may expect retries of the f
// closure (depending on the database backend used), the reset function
// will be called before each retry respectively.
//
// NOTE: For new code, this method should be used directly instead of
// the package level Update() function.
Update(f func(tx ReadWriteTx) error, reset func()) error
}
// BatchDB is a special version of the main DB interface that allos the caller
// to specify write transactions that should be combine dtoegether if multiple
// BatchDB is a special version of the main DB interface that allows the caller
// to specify write transactions that should be combined toegether if multiple
// goroutines are calling the Batch method.
type BatchDB interface {
DB
// Batch is similar to the package-level Update method, but it will
// attempt to optismitcally combine the invocation of several
// attempt to optimistically combine the invocation of several
// transaction functions into a single db write transaction.
Batch(func(tx ReadWriteTx) error) error
}
@ -223,29 +253,11 @@ type BatchDB interface {
// transaction passed as a parameter. After f exits, the transaction is rolled
// back. If f errors, its error is returned, not a rollback error (if any
// occur).
//
// NOTE: For new code the database backend's View method should be used directly
// as this package level function will be phased out in the future.
func View(db DB, f func(tx ReadTx) error) error {
tx, err := db.BeginReadTx()
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if tx != nil {
tx.Rollback()
}
}()
err = f(tx)
rollbackErr := tx.Rollback()
if err != nil {
return err
}
if rollbackErr != nil {
return rollbackErr
}
return nil
return db.View(f, func() {})
}
// Update opens a database read/write transaction and executes the function f
@ -254,28 +266,11 @@ func View(db DB, f func(tx ReadTx) error) error {
// transaction is rolled back. If the rollback fails, the original error
// returned by f is still returned. If the commit fails, the commit error is
// returned.
//
// NOTE: For new code the database backend's Update method should be used
// directly as this package level function will be phased out in the future.
func Update(db DB, f func(tx ReadWriteTx) error) error {
tx, err := db.BeginReadWriteTx()
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if tx != nil {
tx.Rollback()
}
}()
err = f(tx)
if err != nil {
// Want to return the original error, not a rollback error if
// any occur.
_ = tx.Rollback()
return err
}
return tx.Commit()
return db.Update(f, func() {})
}
// Batch opens a database read/write transaction and executes the function f

View file

@ -1575,50 +1575,3 @@ func fetchVersion(ns walletdb.ReadBucket) (uint32, error) {
return byteOrder.Uint32(v), nil
}
func scopedUpdate(db walletdb.DB, namespaceKey []byte, f func(walletdb.ReadWriteBucket) error) error {
tx, err := db.BeginReadWriteTx()
if err != nil {
str := "cannot begin update"
return storeError(ErrDatabase, str, err)
}
err = f(tx.ReadWriteBucket(namespaceKey))
if err != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
const desc = "rollback failed"
serr, ok := err.(Error)
if !ok {
// This really shouldn't happen.
return storeError(ErrDatabase, desc, rollbackErr)
}
serr.Desc = desc + ": " + serr.Desc
return serr
}
return err
}
err = tx.Commit()
if err != nil {
str := "commit failed"
return storeError(ErrDatabase, str, err)
}
return nil
}
func scopedView(db walletdb.DB, namespaceKey []byte, f func(walletdb.ReadBucket) error) error {
tx, err := db.BeginReadTx()
if err != nil {
str := "cannot begin view"
return storeError(ErrDatabase, str, err)
}
err = f(tx.ReadBucket(namespaceKey))
rollbackErr := tx.Rollback()
if err != nil {
return err
}
if rollbackErr != nil {
str := "cannot close view"
return storeError(ErrDatabase, str, rollbackErr)
}
return nil
}