Merge pull request #573 from wpaulino/notify-received

wallet: notify wallet upon relevant transaction confirmation
This commit is contained in:
Olaoluwa Osuntokun 2018-11-14 18:39:41 -08:00 committed by GitHub
commit 4c01c0878c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 470 additions and 44 deletions

View file

@ -1,6 +1,7 @@
package waddrmgr
import (
"errors"
"fmt"
"time"
@ -26,6 +27,10 @@ var versions = []migration.Version{
Number: 6,
Migration: populateBirthdayBlock,
},
{
Number: 7,
Migration: resetSyncedBlockToBirthday,
},
}
// getLatestVersion returns the version number of the latest database version.
@ -350,3 +355,20 @@ func populateBirthdayBlock(ns walletdb.ReadWriteBucket) error {
Hash: *birthdayHash,
})
}
// resetSyncedBlockToBirthday is a migration that resets the wallet's currently
// synced block to its birthday block. This essentially serves as a migration to
// force a rescan of the wallet.
func resetSyncedBlockToBirthday(ns walletdb.ReadWriteBucket) error {
syncBucket := ns.NestedReadWriteBucket(syncBucketName)
if syncBucket == nil {
return errors.New("sync bucket does not exist")
}
birthdayBlock, err := fetchBirthdayBlock(ns)
if err != nil {
return err
}
return putSyncedTo(ns, &birthdayBlock)
}

View file

@ -215,3 +215,84 @@ func TestMigrationPopulateBirthdayBlockEstimateTooFar(t *testing.T) {
false,
)
}
// TestMigrationResetSyncedBlockToBirthday ensures that the wallet properly sees
// its synced to block as the birthday block after resetting it.
func TestMigrationResetSyncedBlockToBirthday(t *testing.T) {
t.Parallel()
var birthdayBlock BlockStamp
beforeMigration := func(ns walletdb.ReadWriteBucket) error {
// To test this migration, we'll assume we're synced to a chain
// of 100 blocks, with our birthday being the 50th block.
block := &BlockStamp{}
for i := int32(1); i < 100; i++ {
block.Height = i
blockHash := bytes.Repeat([]byte(string(i)), 32)
copy(block.Hash[:], blockHash)
if err := putSyncedTo(ns, block); err != nil {
return err
}
}
const birthdayHeight = 50
birthdayHash, err := fetchBlockHash(ns, birthdayHeight)
if err != nil {
return err
}
birthdayBlock = BlockStamp{
Hash: *birthdayHash, Height: birthdayHeight,
}
return putBirthdayBlock(ns, birthdayBlock)
}
afterMigration := func(ns walletdb.ReadWriteBucket) error {
// After the migration has succeeded, we should see that the
// database's synced block now reflects the birthday block.
syncedBlock, err := fetchSyncedTo(ns)
if err != nil {
return err
}
if syncedBlock.Height != birthdayBlock.Height {
return fmt.Errorf("expected synced block height %d, "+
"got %d", birthdayBlock.Height,
syncedBlock.Height)
}
if !syncedBlock.Hash.IsEqual(&birthdayBlock.Hash) {
return fmt.Errorf("expected synced block height %v, "+
"got %v", birthdayBlock.Hash, syncedBlock.Hash)
}
return nil
}
// We can now apply the migration and expect it not to fail.
applyMigration(
t, beforeMigration, afterMigration, resetSyncedBlockToBirthday,
false,
)
}
// TestMigrationResetSyncedBlockToBirthdayWithNoBirthdayBlock ensures that we
// cannot reset our synced to block to our birthday block if one isn't
// available.
func TestMigrationResetSyncedBlockToBirthdayWithNoBirthdayBlock(t *testing.T) {
t.Parallel()
// To replicate the scenario where the database is not aware of a
// birthday block, we won't set one. This should cause the migration to
// fail.
beforeMigration := func(walletdb.ReadWriteBucket) error {
return nil
}
afterMigration := func(walletdb.ReadWriteBucket) error {
return nil
}
applyMigration(
t, beforeMigration, afterMigration, resetSyncedBlockToBirthday,
true,
)
}

View file

@ -170,6 +170,21 @@ func (w *Wallet) txToOutputs(outputs []*wire.TxOut, account uint32,
" %v from imported account into default account.", changeAmount)
}
// Finally, we'll request the backend to notify us of the transaction
// that pays to the change address, if there is one, when it confirms.
if tx.ChangeIndex >= 0 {
changePkScript := tx.Tx.TxOut[tx.ChangeIndex].PkScript
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
changePkScript, w.chainParams,
)
if err != nil {
return nil, err
}
if err := chainClient.NotifyReceived(addrs); err != nil {
return nil, err
}
}
return tx, nil
}

View file

@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb/migration"
"github.com/btcsuite/btcwallet/wtxmgr"
)
// log is a logger that is initialized with no output filters. This
@ -34,6 +35,7 @@ func UseLogger(logger btclog.Logger) {
migration.UseLogger(logger)
waddrmgr.UseLogger(logger)
wtxmgr.UseLogger(logger)
}
// LogClosure is a closure that can be printed with %v to be used to

View file

@ -2786,6 +2786,15 @@ func (w *Wallet) resendUnminedTxs() {
for _, tx := range txs {
resp, err := chainClient.SendRawTransaction(tx, false)
if err != nil {
// If the transaction has already been accepted into the
// mempool, we can continue without logging the error.
switch {
case strings.Contains(err.Error(), "already have transaction"):
fallthrough
case strings.Contains(err.Error(), "txn-already-known"):
continue
}
log.Debugf("Could not resend transaction %v: %v",
tx.TxHash(), err)
@ -2817,9 +2826,12 @@ func (w *Wallet) resendUnminedTxs() {
// As the transaction was rejected, we'll attempt to
// remove the unmined transaction all together.
// Otherwise, we'll keep attempting to rebroadcast
// this, and we may be computing our balance
// incorrectly if this tx credits or debits to us.
// Otherwise, we'll keep attempting to rebroadcast this,
// and we may be computing our balance incorrectly if
// this transaction credits or debits to us.
//
// TODO(wilmer): if already confirmed, move to mined
// bucket - need to determine the confirmation block.
err := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error {
txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey)
@ -2952,6 +2964,11 @@ func (w *Wallet) NewChangeAddress(account uint32,
return addr, nil
}
// newChangeAddress returns a new change address for the wallet.
//
// NOTE: This method requires the caller to use the backend's NotifyReceived
// method in order to detect when an on-chain transaction pays to the address
// being created.
func (w *Wallet) newChangeAddress(addrmgrNs walletdb.ReadWriteBucket,
account uint32) (btcutil.Address, error) {
@ -3305,7 +3322,7 @@ func (w *Wallet) PublishTransaction(tx *wire.MsgTx) error {
// from the database (along with cleaning up all inputs used, and outputs
// created) if the transaction is rejected by the back end.
func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
server, err := w.requireChainClient()
chainClient, err := w.requireChainClient()
if err != nil {
return nil, err
}
@ -3325,7 +3342,33 @@ func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
return nil, err
}
txid, err := server.SendRawTransaction(tx, false)
// We'll also ask to be notified of the transaction once it confirms
// on-chain. This is done outside of the database transaction to prevent
// backend interaction within it.
//
// NOTE: In some cases, it's possible that the transaction to be
// broadcast is not directly relevant to the user's wallet, e.g.,
// multisig. In either case, we'll still ask to be notified of when it
// confirms to maintain consistency.
//
// TODO(wilmer): import script as external if the address does not
// belong to the wallet to handle confs during restarts?
for _, txOut := range tx.TxOut {
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
txOut.PkScript, w.chainParams,
)
if err != nil {
// Non-standard outputs can safely be skipped because
// they're not supported by the wallet.
continue
}
if err := chainClient.NotifyReceived(addrs); err != nil {
return nil, err
}
}
txid, err := chainClient.SendRawTransaction(tx, false)
switch {
case err == nil:
return txid, nil

View file

@ -1311,50 +1311,42 @@ func createStore(ns walletdb.ReadWriteBucket) error {
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketBlocks)
if err != nil {
// Finally, create all of our required descendant buckets.
return createBuckets(ns)
}
// createBuckets creates all of the descendants buckets required for the
// transaction store to properly carry its duties.
func createBuckets(ns walletdb.ReadWriteBucket) error {
if _, err := ns.CreateBucket(bucketBlocks); err != nil {
str := "failed to create blocks bucket"
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketTxRecords)
if err != nil {
if _, err := ns.CreateBucket(bucketTxRecords); err != nil {
str := "failed to create tx records bucket"
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketCredits)
if err != nil {
if _, err := ns.CreateBucket(bucketCredits); err != nil {
str := "failed to create credits bucket"
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketDebits)
if err != nil {
if _, err := ns.CreateBucket(bucketDebits); err != nil {
str := "failed to create debits bucket"
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketUnspent)
if err != nil {
if _, err := ns.CreateBucket(bucketUnspent); err != nil {
str := "failed to create unspent bucket"
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketUnmined)
if err != nil {
if _, err := ns.CreateBucket(bucketUnmined); err != nil {
str := "failed to create unmined bucket"
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketUnminedCredits)
if err != nil {
if _, err := ns.CreateBucket(bucketUnminedCredits); err != nil {
str := "failed to create unmined credits bucket"
return storeError(ErrDatabase, str, err)
}
_, err = ns.CreateBucket(bucketUnminedInputs)
if err != nil {
if _, err := ns.CreateBucket(bucketUnminedInputs); err != nil {
str := "failed to create unmined inputs bucket"
return storeError(ErrDatabase, str, err)
}
@ -1362,6 +1354,45 @@ func createStore(ns walletdb.ReadWriteBucket) error {
return nil
}
// deleteBuckets deletes all of the descendants buckets required for the
// transaction store to properly carry its duties.
func deleteBuckets(ns walletdb.ReadWriteBucket) error {
if err := ns.DeleteNestedBucket(bucketBlocks); err != nil {
str := "failed to delete blocks bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketTxRecords); err != nil {
str := "failed to delete tx records bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketCredits); err != nil {
str := "failed to delete credits bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketDebits); err != nil {
str := "failed to delete debits bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketUnspent); err != nil {
str := "failed to delete unspent bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketUnmined); err != nil {
str := "failed to delete unmined bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketUnminedCredits); err != nil {
str := "failed to delete unmined credits bucket"
return storeError(ErrDatabase, str, err)
}
if err := ns.DeleteNestedBucket(bucketUnminedInputs); err != nil {
str := "failed to delete unmined inputs bucket"
return storeError(ErrDatabase, str, err)
}
return nil
}
// putVersion modifies the version of the store to reflect the given version
// number.
func putVersion(ns walletdb.ReadWriteBucket, version uint32) error {

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package wtxmgr_test
package wtxmgr
import (
"fmt"
@ -11,29 +11,28 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/btcsuite/btcwallet/wtxmgr"
)
var (
// Spends: bogus
// Outputs: 10 BTC
exampleTxRecordA *wtxmgr.TxRecord
exampleTxRecordA *TxRecord
// Spends: A:0
// Outputs: 5 BTC, 5 BTC
exampleTxRecordB *wtxmgr.TxRecord
exampleTxRecordB *TxRecord
)
func init() {
tx := spendOutput(&chainhash.Hash{}, 0, 10e8)
rec, err := wtxmgr.NewTxRecordFromMsgTx(tx, timeNow())
rec, err := NewTxRecordFromMsgTx(tx, timeNow())
if err != nil {
panic(err)
}
exampleTxRecordA = rec
tx = spendOutput(&exampleTxRecordA.Hash, 0, 5e8, 5e8)
rec, err = wtxmgr.NewTxRecordFromMsgTx(tx, timeNow())
rec, err = NewTxRecordFromMsgTx(tx, timeNow())
if err != nil {
panic(err)
}
@ -183,12 +182,12 @@ func Example_basicUsage() {
}
// Create and open the transaction store in the provided namespace.
err = wtxmgr.Create(b)
err = Create(b)
if err != nil {
fmt.Println(err)
return
}
s, err := wtxmgr.Open(b, &chaincfg.TestNet3Params)
s, err := Open(b, &chaincfg.TestNet3Params)
if err != nil {
fmt.Println(err)
return

View file

@ -9,12 +9,17 @@ import "github.com/btcsuite/btclog"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log = btclog.Disabled
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
log = btclog.Disabled
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info.
@ -23,3 +28,20 @@ func DisableLog() {
func UseLogger(logger btclog.Logger) {
log = logger
}
// LogClosure is a closure that can be printed with %v to be used to
// generate expensive-to-create data for a detailed log level and avoid doing
// the work if the data isn't printed.
type logClosure func() string
// String invokes the log closure and returns the results string.
func (c logClosure) String() string {
return c()
}
// newLogClosure returns a new closure over the passed function which allows
// it to be used as a parameter in a logging function that is only invoked when
// the logging level is such that the message will actually be logged.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}

View file

@ -14,6 +14,10 @@ var versions = []migration.Version{
Number: 1,
Migration: nil,
},
{
Number: 2,
Migration: dropTransactionHistory,
},
}
// getLatestVersion returns the version number of the latest database version.
@ -81,3 +85,26 @@ func (m *MigrationManager) SetVersion(ns walletdb.ReadWriteBucket,
func (m *MigrationManager) Versions() []migration.Version {
return versions
}
// dropTransactionHistory is a migration that attempts to recreate the
// transaction store with a clean state.
func dropTransactionHistory(ns walletdb.ReadWriteBucket) error {
log.Info("Dropping wallet transaction history")
// To drop the store's transaction history, we'll need to remove all of
// the relevant descendant buckets and key/value pairs.
if err := deleteBuckets(ns); err != nil {
return err
}
if err := ns.Delete(rootMinedBalance); err != nil {
return err
}
// With everything removed, we'll now recreate our buckets.
if err := createBuckets(ns); err != nil {
return err
}
// Finally, we'll insert a 0 value for our mined balance.
return putMinedBalance(ns, 0)
}

187
wtxmgr/migrations_test.go Normal file
View file

@ -0,0 +1,187 @@
package wtxmgr
import (
"errors"
"fmt"
"testing"
"github.com/btcsuite/btcwallet/walletdb"
)
// applyMigration is a helper function that allows us to assert the state of the
// top-level bucket before and after a migration. This can be used to ensure
// the correctness of migrations.
func applyMigration(t *testing.T,
beforeMigration, afterMigration func(walletdb.ReadWriteBucket, *Store) error,
migration func(walletdb.ReadWriteBucket) error, shouldFail bool) {
t.Helper()
// We'll start by setting up our transaction store backed by a database.
store, db, teardown, err := testStore()
if err != nil {
t.Fatalf("unable to create test store: %v", err)
}
defer teardown()
// First, we'll run the beforeMigration closure, which contains the
// database modifications/assertions needed before proceeding with the
// migration.
err = walletdb.Update(db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(namespaceKey)
if ns == nil {
return errors.New("top-level namespace does not exist")
}
return beforeMigration(ns, store)
})
if err != nil {
t.Fatalf("unable to run beforeMigration func: %v", err)
}
// Then, we'll run the migration itself and fail if it does not match
// its expected result.
err = walletdb.Update(db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(namespaceKey)
if ns == nil {
return errors.New("top-level namespace does not exist")
}
return migration(ns)
})
if err != nil && !shouldFail {
t.Fatalf("unable to perform migration: %v", err)
} else if err == nil && shouldFail {
t.Fatal("expected migration to fail, but did not")
}
// Finally, we'll run the afterMigration closure, which contains the
// assertions needed in order to guarantee than the migration was
// successful.
err = walletdb.Update(db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(namespaceKey)
if ns == nil {
return errors.New("top-level namespace does not exist")
}
return afterMigration(ns, store)
})
if err != nil {
t.Fatalf("unable to run afterMigration func: %v", err)
}
}
// TestMigrationDropTransactionHistory ensures that a transaction store is reset
// to a clean state after dropping its transaction history.
func TestMigrationDropTransactionHistory(t *testing.T) {
t.Parallel()
// checkTransactions is a helper function that will assert the correct
// state of the transaction store based on whether the migration has
// completed or not.
checkTransactions := func(ns walletdb.ReadWriteBucket, s *Store,
afterMigration bool) error {
// We should see one confirmed unspent output before the
// migration, and none after.
utxos, err := s.UnspentOutputs(ns)
if err != nil {
return err
}
if len(utxos) == 0 && !afterMigration {
return errors.New("expected to find 1 utxo, found none")
}
if len(utxos) > 0 && afterMigration {
return fmt.Errorf("expected to find 0 utxos, found %d",
len(utxos))
}
// We should see one unconfirmed transaction before the
// migration, and none after.
unconfirmedTxs, err := s.UnminedTxs(ns)
if err != nil {
return err
}
if len(unconfirmedTxs) == 0 && !afterMigration {
return errors.New("expected to find 1 unconfirmed " +
"transaction, found none")
}
if len(unconfirmedTxs) > 0 && afterMigration {
return fmt.Errorf("expected to find 0 unconfirmed "+
"transactions, found %d", len(unconfirmedTxs))
}
// We should have a non-zero balance before the migration, and
// zero after.
minedBalance, err := fetchMinedBalance(ns)
if err != nil {
return err
}
if minedBalance == 0 && !afterMigration {
return errors.New("expected non-zero balance before " +
"migration")
}
if minedBalance > 0 && afterMigration {
return fmt.Errorf("expected zero balance after "+
"migration, got %d", minedBalance)
}
return nil
}
beforeMigration := func(ns walletdb.ReadWriteBucket, s *Store) error {
// We'll start by adding two transactions to the store: a
// confirmed transaction and an unconfirmed transaction one.
// The confirmed transaction will spend from a coinbase output,
// while the unconfirmed will spend an output from the confirmed
// transaction.
cb := newCoinBase(1e8)
cbRec, err := NewTxRecordFromMsgTx(cb, timeNow())
if err != nil {
return err
}
b := &BlockMeta{Block: Block{Height: 100}}
confirmedSpend := spendOutput(&cbRec.Hash, 0, 5e7, 4e7)
confirmedSpendRec, err := NewTxRecordFromMsgTx(
confirmedSpend, timeNow(),
)
if err := s.InsertTx(ns, confirmedSpendRec, b); err != nil {
return err
}
err = s.AddCredit(ns, confirmedSpendRec, b, 1, true)
if err != nil {
return err
}
unconfimedSpend := spendOutput(
&confirmedSpendRec.Hash, 0, 5e6, 5e6,
)
unconfirmedSpendRec, err := NewTxRecordFromMsgTx(
unconfimedSpend, timeNow(),
)
if err != nil {
return err
}
if err := s.InsertTx(ns, unconfirmedSpendRec, nil); err != nil {
return err
}
err = s.AddCredit(ns, unconfirmedSpendRec, nil, 1, true)
if err != nil {
return err
}
// Ensure these transactions exist within the store.
return checkTransactions(ns, s, false)
}
afterMigration := func(ns walletdb.ReadWriteBucket, s *Store) error {
// Assuming the migration was successful, we should see that the
// store no longer has the transaction history prior to the
// migration.
return checkTransactions(ns, s, true)
}
// We can now apply the migration and expect it not to fail.
applyMigration(
t, beforeMigration, afterMigration, dropTransactionHistory,
false,
)
}

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package wtxmgr_test
package wtxmgr
import (
"bytes"
@ -16,7 +16,6 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb"
. "github.com/btcsuite/btcwallet/wtxmgr"
)
type queryState struct {

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package wtxmgr_test
package wtxmgr
import (
"bytes"
@ -19,8 +19,6 @@ import (
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/walletdb"
_ "github.com/btcsuite/btcwallet/walletdb/bdb"
"github.com/btcsuite/btcwallet/wtxmgr"
. "github.com/btcsuite/btcwallet/wtxmgr"
)
// Received transaction output for mainnet outpoint
@ -1572,7 +1570,7 @@ func testInsertMempoolDoubleSpendTx(t *testing.T, first bool) {
Time: time.Now(),
}
var confirmedSpendRec *wtxmgr.TxRecord
var confirmedSpendRec *TxRecord
if first {
confirmedSpendRec = firstSpendRec
} else {