Merge pull request #597 from wpaulino/tx-broadcast-err-handling

wallet/wallet: remove invalid transactions when broadcast fails
This commit is contained in:
Olaoluwa Osuntokun 2019-03-08 21:10:47 -08:00 committed by GitHub
commit 25804bf90f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2831,96 +2831,29 @@ func (w *Wallet) LockedOutpoints() []btcjson.TransactionInput {
// credits that are not known to have been mined into a block, and attempts // credits that are not known to have been mined into a block, and attempts
// to send each to the chain server for relay. // to send each to the chain server for relay.
func (w *Wallet) resendUnminedTxs() { func (w *Wallet) resendUnminedTxs() {
chainClient, err := w.requireChainClient()
if err != nil {
log.Errorf("No chain server available to resend unmined transactions")
return
}
var txs []*wire.MsgTx var txs []*wire.MsgTx
err = walletdb.View(w.db, func(tx walletdb.ReadTx) error { err := walletdb.View(w.db, func(tx walletdb.ReadTx) error {
txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey)
var err error var err error
txs, err = w.TxStore.UnminedTxs(txmgrNs) txs, err = w.TxStore.UnminedTxs(txmgrNs)
return err return err
}) })
if err != nil { if err != nil {
log.Errorf("Cannot load unmined transactions for resending: %v", err) log.Errorf("Unable to retrieve unconfirmed transactions to "+
"resend: %v", err)
return return
} }
for _, tx := range txs { for _, tx := range txs {
resp, err := chainClient.SendRawTransaction(tx, false) txHash, err := w.publishTransaction(tx)
if err != nil { if err != nil {
// If the transaction has already been accepted into the log.Debugf("Unable to rebroadcast transaction %v: %v",
// mempool, we can continue without logging the error.
switch {
case strings.Contains(err.Error(), "already have transaction"):
fallthrough
case strings.Contains(err.Error(), "txn-already-known"):
continue
}
log.Debugf("Could not resend transaction %v: %v",
tx.TxHash(), err) tx.TxHash(), err)
// We'll only stop broadcasting transactions if we
// detect that the output has already been fully spent,
// is an orphan, or is conflicting with another
// transaction.
//
// TODO(roasbeef): SendRawTransaction needs to return
// concrete error types, no need for string matching
switch {
// The following are errors returned from btcd's
// mempool.
case strings.Contains(err.Error(), "spent"):
case strings.Contains(err.Error(), "orphan"):
case strings.Contains(err.Error(), "conflict"):
case strings.Contains(err.Error(), "already exists"):
case strings.Contains(err.Error(), "negative"):
// The following errors are returned from bitcoind's
// mempool.
case strings.Contains(err.Error(), "Missing inputs"):
case strings.Contains(err.Error(), "already in block chain"):
case strings.Contains(err.Error(), "fee not met"):
default:
continue continue
} }
// As the transaction was rejected, we'll attempt to log.Debugf("Successfully rebroadcast unconfirmed transaction %v",
// remove the unmined transaction all together. txHash)
// Otherwise, we'll keep attempting to rebroadcast this,
// and we may be computing our balance incorrectly if
// this transaction credits or debits to us.
//
// TODO(wilmer): if already confirmed, move to mined
// bucket - need to determine the confirmation block.
err := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error {
txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey)
txRec, err := wtxmgr.NewTxRecordFromMsgTx(
tx, time.Now(),
)
if err != nil {
return err
}
return w.TxStore.RemoveUnminedTx(txmgrNs, txRec)
})
if err != nil {
log.Warnf("unable to remove conflicting "+
"tx %v: %v", tx.TxHash(), err)
continue
}
log.Infof("Removed conflicting tx: %v", spew.Sdump(tx))
continue
}
log.Debugf("Resent unmined transaction %v", resp)
} }
} }
@ -3224,7 +3157,7 @@ func (w *Wallet) SendOutputs(outputs []*wire.TxOut, account uint32,
return nil, err return nil, err
} }
txHash, err := w.publishTransaction(createdTx.Tx) txHash, err := w.reliablyPublishTransaction(createdTx.Tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -3380,16 +3313,16 @@ func (w *Wallet) SignTransaction(tx *wire.MsgTx, hashType txscript.SigHashType,
// This function is unstable and will be removed once syncing code is moved out // This function is unstable and will be removed once syncing code is moved out
// of the wallet. // of the wallet.
func (w *Wallet) PublishTransaction(tx *wire.MsgTx) error { func (w *Wallet) PublishTransaction(tx *wire.MsgTx) error {
_, err := w.publishTransaction(tx) _, err := w.reliablyPublishTransaction(tx)
return err return err
} }
// publishTransaction is the private version of PublishTransaction which // reliablyPublishTransaction is a superset of publishTransaction which contains
// contains the primary logic required for publishing a transaction, updating // the primary logic required for publishing a transaction, updating the
// the relevant database state, and finally possible removing the transaction // relevant database state, and finally possible removing the transaction from
// from the database (along with cleaning up all inputs used, and outputs // the database (along with cleaning up all inputs used, and outputs created) if
// created) if the transaction is rejected by the back end. // the transaction is rejected by the backend.
func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) { func (w *Wallet) reliablyPublishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
chainClient, err := w.requireChainClient() chainClient, err := w.requireChainClient()
if err != nil { if err != nil {
return nil, err return nil, err
@ -3436,41 +3369,93 @@ func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
} }
} }
return w.publishTransaction(tx)
}
// publishTransaction attempts to send an unconfirmed transaction to the
// wallet's current backend. In the event that sending the transaction fails for
// whatever reason, it will be removed from the wallet's unconfirmed transaction
// store.
func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
chainClient, err := w.requireChainClient()
if err != nil {
return nil, err
}
txid, err := chainClient.SendRawTransaction(tx, false) txid, err := chainClient.SendRawTransaction(tx, false)
switch { switch {
case err == nil: case err == nil:
return txid, nil return txid, nil
// The following are errors returned from btcd's mempool. // Since we have different backends that can be used with the wallet,
case strings.Contains(err.Error(), "spent"): // we'll need to check specific errors for each one.
fallthrough //
case strings.Contains(err.Error(), "orphan"): // If the transaction is already in the mempool, we can just return now.
fallthrough //
case strings.Contains(err.Error(), "conflict"): // This error is returned when broadcasting/sending a transaction to a
// btcd node that already has it in their mempool.
case strings.Contains(err.Error(), "already have transaction"):
fallthrough fallthrough
// The following errors are returned from bitcoind's mempool. // This error is returned when broadcasting a transaction to a bitcoind
case strings.Contains(err.Error(), "fee not met"): // node that already has it in their mempool.
case strings.Contains(err.Error(), "txn-already-in-mempool"):
return txid, nil
// If the transaction has already confirmed, we can safely remove it
// from the unconfirmed store as it should already exist within the
// confirmed store. We'll avoid returning an error as the broadcast was
// in a sense successful.
//
// This error is returned when broadcasting/sending a transaction that
// has already confirmed to a btcd node.
case strings.Contains(err.Error(), "transaction already exists"):
fallthrough fallthrough
case strings.Contains(err.Error(), "Missing inputs"):
// This error is returned when broadcasting a transaction that has
// already confirmed to a bitcoind node.
case strings.Contains(err.Error(), "txn-already-known"):
fallthrough fallthrough
case strings.Contains(err.Error(), "already in block chain"):
// If the transaction was rejected, then we'll remove it from // This error is returned when sending a transaction that has already
// the txstore, as otherwise, we'll attempt to continually // confirmed to a bitcoind node over RPC.
// re-broadcast it, and the utxo state of the wallet won't be case strings.Contains(err.Error(), "transaction already in block chain"):
// accurate.
dbErr := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error { dbErr := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error {
txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey) txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey)
txRec, err := wtxmgr.NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
return err
}
return w.TxStore.RemoveUnminedTx(txmgrNs, txRec) return w.TxStore.RemoveUnminedTx(txmgrNs, txRec)
}) })
if dbErr != nil { if dbErr != nil {
return nil, fmt.Errorf("unable to broadcast tx: %v, "+ log.Warnf("Unable to remove confirmed transaction %v "+
"unable to remove invalid tx: %v", err, dbErr) "from unconfirmed store: %v", tx.TxHash(), dbErr)
} }
return nil, err return txid, nil
// If the transaction was rejected for whatever other reason, then we'll
// remove it from the transaction store, as otherwise, we'll attempt to
// continually re-broadcast it, and the UTXO state of the wallet won't
// be accurate.
default: default:
dbErr := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error {
txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey)
txRec, err := wtxmgr.NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
return err
}
return w.TxStore.RemoveUnminedTx(txmgrNs, txRec)
})
if dbErr != nil {
log.Warnf("Unable to remove invalid transaction %v: %v",
tx.TxHash(), dbErr)
} else {
log.Infof("Removed invalid transaction: %v",
spew.Sdump(tx))
}
return nil, err return nil, err
} }
} }