Handle out-of-order notifications from btcd.

Notifications ariving from btcd were being reordered (each handled by
its own goroutine, rather then being always sent in the order they
originated).  This was breaking the new transaction store by inserting
transaction records in an 'impossible' manner, that is, inserting txs
without block info after the store already held records of the same tx
with block info, without first performing a rollback.

This is handled by the transaction store insert methods by checking
for identical transactions (double spends with the same tx sha), but
where the block heights mismatch and the new record does not have a
block set.  The error is returned all the way up to the goroutine
running each rpc request/notification handler, and if hit, the btcd
connection is closed and all accounts are reopened from disk.  This is
not optimal, but it allows us to use the connect logic to correctly
catch us up to the best chain with the last good state of all accounts
while only rescanning a few blocks.

Fixes .
This commit is contained in:
Josh Rickmar 2014-02-28 13:03:23 -05:00
parent 76c6379a54
commit 2e76bcd159
7 changed files with 296 additions and 147 deletions

View file

@ -43,6 +43,7 @@ type AccountManager struct {
// binary semaphore channel to prevent incorrect access. // binary semaphore channel to prevent incorrect access.
bsem chan struct{} bsem chan struct{}
openAccounts chan struct{}
accessAccount chan *accessAccountRequest accessAccount chan *accessAccountRequest
accessAll chan *accessAllRequest accessAll chan *accessAllRequest
add chan *Account add chan *Account
@ -55,6 +56,7 @@ type AccountManager struct {
func NewAccountManager() *AccountManager { func NewAccountManager() *AccountManager {
am := &AccountManager{ am := &AccountManager{
bsem: make(chan struct{}, 1), bsem: make(chan struct{}, 1),
openAccounts: make(chan struct{}, 1),
accessAccount: make(chan *accessAccountRequest), accessAccount: make(chan *accessAccountRequest),
accessAll: make(chan *accessAllRequest), accessAll: make(chan *accessAllRequest),
add: make(chan *Account), add: make(chan *Account),
@ -81,6 +83,20 @@ func (am *AccountManager) Start() {
for { for {
select { select {
case <-am.openAccounts:
// Write all old accounts before proceeding.
for e := l.Front(); e != nil; e = e.Next() {
a := e.Value.(*Account)
am.ds.FlushAccount(a)
}
m = OpenAccounts()
l.Init()
for _, a := range m {
l.PushBack(a)
}
case access := <-am.accessAccount: case access := <-am.accessAccount:
a, ok := m[access.name] a, ok := m[access.name]
access.resp <- &accessAccountResponse{ access.resp <- &accessAccountResponse{
@ -129,6 +145,10 @@ func (am *AccountManager) Release() {
am.bsem <- struct{}{} am.bsem <- struct{}{}
} }
func (am *AccountManager) OpenAccounts() {
am.openAccounts <- struct{}{}
}
type accessAccountRequest struct { type accessAccountRequest struct {
name string name string
resp chan *accessAccountResponse resp chan *accessAccountResponse
@ -196,7 +216,7 @@ func (am *AccountManager) RegisterNewAccount(a *Account) error {
// Rollback rolls back each managed Account to the state before the block // Rollback rolls back each managed Account to the state before the block
// specified by height and hash was connected to the main chain. // specified by height and hash was connected to the main chain.
func (am *AccountManager) Rollback(height int32, hash *btcwire.ShaHash) { func (am *AccountManager) Rollback(height int32, hash *btcwire.ShaHash) {
log.Debugf("Rolling back tx history since block height %v", height) log.Infof("Rolling back tx history since block height %v", height)
for _, a := range am.AllAccounts() { for _, a := range am.AllAccounts() {
a.TxStore.Rollback(height) a.TxStore.Rollback(height)
@ -204,13 +224,6 @@ func (am *AccountManager) Rollback(height int32, hash *btcwire.ShaHash) {
} }
} }
// Rollback reverts each stored Account to a state before the block
// with the passed chainheight and block hash was connected to the main
// chain. This is used to remove transactions and utxos for each wallet
// that occured on a chain no longer considered to be the main chain.
func (a *Account) Rollback(height int32, hash *btcwire.ShaHash) {
}
// BlockNotify notifies all frontends of any changes from the new block, // BlockNotify notifies all frontends of any changes from the new block,
// including changed balances. Each account is then set to be synced // including changed balances. Each account is then set to be synced
// with the latest block. // with the latest block.

28
cmd.go
View file

@ -146,10 +146,9 @@ func main() {
// Check and update any old file locations. // Check and update any old file locations.
updateOldFileLocations() updateOldFileLocations()
// Start account manager and open accounts.
go AcctMgr.Start() go AcctMgr.Start()
AcctMgr.OpenAccounts()
// Open all account saved to disk.
OpenAccounts()
// Read CA file to verify a btcd TLS connection. // Read CA file to verify a btcd TLS connection.
cafile, err := ioutil.ReadFile(cfg.CAFile) cafile, err := ioutil.ReadFile(cfg.CAFile)
@ -323,7 +322,7 @@ func OpenSavedAccount(name string, cfg *config) (*Account, error) {
} }
// OpenAccounts attempts to open all saved accounts. // OpenAccounts attempts to open all saved accounts.
func OpenAccounts() { func OpenAccounts() map[string]*Account {
// If the network (account) directory is missing, but the temporary // If the network (account) directory is missing, but the temporary
// directory exists, move it. This is unlikely to happen, but possible, // directory exists, move it. This is unlikely to happen, but possible,
// if writing out every account file at once to a tmp directory (as is // if writing out every account file at once to a tmp directory (as is
@ -335,7 +334,7 @@ func OpenAccounts() {
if !fileExists(netDir) && fileExists(tmpNetDir) { if !fileExists(netDir) && fileExists(tmpNetDir) {
if err := Rename(tmpNetDir, netDir); err != nil { if err := Rename(tmpNetDir, netDir); err != nil {
log.Errorf("Cannot move temporary network dir: %v", err) log.Errorf("Cannot move temporary network dir: %v", err)
return return nil
} }
} }
@ -346,13 +345,15 @@ func OpenAccounts() {
switch err.(type) { switch err.(type) {
case *WalletOpenError: case *WalletOpenError:
log.Errorf("Default account wallet file unreadable: %v", err) log.Errorf("Default account wallet file unreadable: %v", err)
return return nil
default: default:
log.Warnf("Non-critical problem opening an account file: %v", err) log.Warnf("Non-critical problem opening an account file: %v", err)
} }
} }
AcctMgr.AddAccount(a) accounts := map[string]*Account{
"": a,
}
// Read all filenames in the account directory, and look for any // Read all filenames in the account directory, and look for any
// filenames matching '*-wallet.bin'. These are wallets for // filenames matching '*-wallet.bin'. These are wallets for
@ -361,7 +362,7 @@ func OpenAccounts() {
if err != nil { if err != nil {
// Can't continue. // Can't continue.
log.Errorf("Unable to open account directory: %v", err) log.Errorf("Unable to open account directory: %v", err)
return return nil
} }
defer accountDir.Close() defer accountDir.Close()
fileNames, err := accountDir.Readdirnames(0) fileNames, err := accountDir.Readdirnames(0)
@ -370,20 +371,20 @@ func OpenAccounts() {
// at least try to open some accounts. // at least try to open some accounts.
log.Errorf("Unable to read all account files: %v", err) log.Errorf("Unable to read all account files: %v", err)
} }
var accounts []string var accountNames []string
for _, file := range fileNames { for _, file := range fileNames {
if strings.HasSuffix(file, "-wallet.bin") { if strings.HasSuffix(file, "-wallet.bin") {
name := strings.TrimSuffix(file, "-wallet.bin") name := strings.TrimSuffix(file, "-wallet.bin")
accounts = append(accounts, name) accountNames = append(accountNames, name)
} }
} }
// Open all additional accounts. // Open all additional accounts.
for _, a := range accounts { for _, acctName := range accountNames {
// Log txstore/utxostore errors as these will be recovered // Log txstore/utxostore errors as these will be recovered
// from with a rescan, but wallet errors must be returned // from with a rescan, but wallet errors must be returned
// to the caller. // to the caller.
a, err := OpenSavedAccount(a, cfg) a, err := OpenSavedAccount(acctName, cfg)
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case *WalletOpenError: case *WalletOpenError:
@ -393,9 +394,10 @@ func OpenAccounts() {
log.Warnf("Non-critical error opening an account file: %v", err) log.Warnf("Non-critical error opening an account file: %v", err)
} }
} else { } else {
AcctMgr.AddAccount(a) accounts[acctName] = a
} }
} }
return accounts
} }
var accessServer = make(chan *AccessCurrentServerConn) var accessServer = make(chan *AccessCurrentServerConn)

View file

@ -20,6 +20,10 @@ package main
import ( import (
"encoding/hex" "encoding/hex"
"fmt"
"sync"
"time"
"github.com/conformal/btcjson" "github.com/conformal/btcjson"
"github.com/conformal/btcscript" "github.com/conformal/btcscript"
"github.com/conformal/btcutil" "github.com/conformal/btcutil"
@ -27,8 +31,6 @@ import (
"github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
"github.com/conformal/btcws" "github.com/conformal/btcws"
"sync"
"time"
) )
func parseBlock(block *btcws.BlockDetails) (*tx.BlockDetails, error) { func parseBlock(block *btcws.BlockDetails) (*tx.BlockDetails, error) {
@ -47,7 +49,7 @@ func parseBlock(block *btcws.BlockDetails) (*tx.BlockDetails, error) {
}, nil }, nil
} }
type notificationHandler func(btcjson.Cmd) type notificationHandler func(btcjson.Cmd) error
var notificationHandlers = map[string]notificationHandler{ var notificationHandlers = map[string]notificationHandler{
btcws.BlockConnectedNtfnMethod: NtfnBlockConnected, btcws.BlockConnectedNtfnMethod: NtfnBlockConnected,
@ -57,36 +59,31 @@ var notificationHandlers = map[string]notificationHandler{
} }
// NtfnRecvTx handles the btcws.RecvTxNtfn notification. // NtfnRecvTx handles the btcws.RecvTxNtfn notification.
func NtfnRecvTx(n btcjson.Cmd) { func NtfnRecvTx(n btcjson.Cmd) error {
rtx, ok := n.(*btcws.RecvTxNtfn) rtx, ok := n.(*btcws.RecvTxNtfn)
if !ok { if !ok {
log.Errorf("%v handler: unexpected type", n.Method()) return fmt.Errorf("%v handler: unexpected type", n.Method())
return
} }
bs, err := GetCurBlock() bs, err := GetCurBlock()
if err != nil { if err != nil {
log.Errorf("%v handler: cannot get current block: %v", n.Method(), err) return fmt.Errorf("%v handler: cannot get current block: %v", n.Method(), err)
return
} }
rawTx, err := hex.DecodeString(rtx.HexTx) rawTx, err := hex.DecodeString(rtx.HexTx)
if err != nil { if err != nil {
log.Errorf("%v handler: bad hexstring: err", n.Method(), err) return fmt.Errorf("%v handler: bad hexstring: err", n.Method(), err)
return
} }
tx_, err := btcutil.NewTxFromBytes(rawTx) tx_, err := btcutil.NewTxFromBytes(rawTx)
if err != nil { if err != nil {
log.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err) return fmt.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err)
return
} }
var block *tx.BlockDetails var block *tx.BlockDetails
if rtx.Block != nil { if rtx.Block != nil {
block, err = parseBlock(rtx.Block) block, err = parseBlock(rtx.Block)
if err != nil { if err != nil {
log.Errorf("%v handler: bad block: %v", n.Method(), err) return fmt.Errorf("%v handler: bad block: %v", n.Method(), err)
return
} }
} }
@ -131,7 +128,10 @@ func NtfnRecvTx(n btcjson.Cmd) {
} }
for _, a := range accounts { for _, a := range accounts {
record := a.TxStore.InsertRecvTxOut(tx_, uint32(outIdx), false, received, block) record, err := a.TxStore.InsertRecvTxOut(tx_, uint32(outIdx), false, received, block)
if err != nil {
return err
}
AcctMgr.ds.ScheduleTxStoreWrite(a) AcctMgr.ds.ScheduleTxStoreWrite(a)
// Notify frontends of tx. If the tx is unconfirmed, it is always // Notify frontends of tx. If the tx is unconfirmed, it is always
@ -163,6 +163,8 @@ func NtfnRecvTx(n btcjson.Cmd) {
NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed) NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed)
} }
} }
return nil
} }
// NtfnBlockConnected handles btcd notifications resulting from newly // NtfnBlockConnected handles btcd notifications resulting from newly
@ -172,16 +174,14 @@ func NtfnRecvTx(n btcjson.Cmd) {
// to mark wallet files with a possibly-better earliest block height, // to mark wallet files with a possibly-better earliest block height,
// and will greatly reduce rescan times for wallets created with an // and will greatly reduce rescan times for wallets created with an
// out of sync btcd. // out of sync btcd.
func NtfnBlockConnected(n btcjson.Cmd) { func NtfnBlockConnected(n btcjson.Cmd) error {
bcn, ok := n.(*btcws.BlockConnectedNtfn) bcn, ok := n.(*btcws.BlockConnectedNtfn)
if !ok { if !ok {
log.Errorf("%v handler: unexpected type", n.Method()) return fmt.Errorf("%v handler: unexpected type", n.Method())
return
} }
hash, err := btcwire.NewShaHashFromStr(bcn.Hash) hash, err := btcwire.NewShaHashFromStr(bcn.Hash)
if err != nil { if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method()) return fmt.Errorf("%v handler: invalid hash string", n.Method())
return
} }
// Update the blockstamp for the newly-connected block. // Update the blockstamp for the newly-connected block.
@ -211,21 +211,21 @@ func NtfnBlockConnected(n btcjson.Cmd) {
// Pass notification to frontends too. // Pass notification to frontends too.
marshaled, _ := n.MarshalJSON() marshaled, _ := n.MarshalJSON()
allClients <- marshaled allClients <- marshaled
return nil
} }
// NtfnBlockDisconnected handles btcd notifications resulting from // NtfnBlockDisconnected handles btcd notifications resulting from
// blocks disconnected from the main chain in the event of a chain // blocks disconnected from the main chain in the event of a chain
// switch and notifies frontends of the new blockchain height. // switch and notifies frontends of the new blockchain height.
func NtfnBlockDisconnected(n btcjson.Cmd) { func NtfnBlockDisconnected(n btcjson.Cmd) error {
bdn, ok := n.(*btcws.BlockDisconnectedNtfn) bdn, ok := n.(*btcws.BlockDisconnectedNtfn)
if !ok { if !ok {
log.Errorf("%v handler: unexpected type", n.Method()) return fmt.Errorf("%v handler: unexpected type", n.Method())
return
} }
hash, err := btcwire.NewShaHashFromStr(bdn.Hash) hash, err := btcwire.NewShaHashFromStr(bdn.Hash)
if err != nil { if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method()) return fmt.Errorf("%v handler: invalid hash string", n.Method())
return
} }
// Rollback Utxo and Tx data stores. // Rollback Utxo and Tx data stores.
@ -234,32 +234,32 @@ func NtfnBlockDisconnected(n btcjson.Cmd) {
// Pass notification to frontends too. // Pass notification to frontends too.
marshaled, _ := n.MarshalJSON() marshaled, _ := n.MarshalJSON()
allClients <- marshaled allClients <- marshaled
return nil
} }
// NtfnRedeemingTx handles btcd redeemingtx notifications resulting from a // NtfnRedeemingTx handles btcd redeemingtx notifications resulting from a
// transaction spending a watched outpoint. // transaction spending a watched outpoint.
func NtfnRedeemingTx(n btcjson.Cmd) { func NtfnRedeemingTx(n btcjson.Cmd) error {
cn, ok := n.(*btcws.RedeemingTxNtfn) cn, ok := n.(*btcws.RedeemingTxNtfn)
if !ok { if !ok {
log.Errorf("%v handler: unexpected type", n.Method()) return fmt.Errorf("%v handler: unexpected type", n.Method())
return
} }
rawTx, err := hex.DecodeString(cn.HexTx) rawTx, err := hex.DecodeString(cn.HexTx)
if err != nil { if err != nil {
log.Errorf("%v handler: bad hexstring: err", n.Method(), err) return fmt.Errorf("%v handler: bad hexstring: err", n.Method(), err)
return
} }
tx_, err := btcutil.NewTxFromBytes(rawTx) tx_, err := btcutil.NewTxFromBytes(rawTx)
if err != nil { if err != nil {
log.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err) return fmt.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err)
return
} }
block, err := parseBlock(cn.Block) block, err := parseBlock(cn.Block)
if err != nil { if err != nil {
log.Errorf("%v handler: bad block: %v", n.Method(), err) return fmt.Errorf("%v handler: bad block: %v", n.Method(), err)
return
} }
AcctMgr.RecordSpendingTx(tx_, block) AcctMgr.RecordSpendingTx(tx_, block)
return nil
} }

View file

@ -110,6 +110,15 @@ func (btcd *BtcdRPCConn) Connected() bool {
} }
} }
// Close forces closing the current btcd connection.
func (btcd *BtcdRPCConn) Close() {
select {
case <-btcd.closed:
default:
close(btcd.closed)
}
}
// AddRPCRequest is used to add an RPCRequest to the pool of requests // AddRPCRequest is used to add an RPCRequest to the pool of requests
// being manaaged by a btcd RPC connection. // being manaaged by a btcd RPC connection.
type AddRPCRequest struct { type AddRPCRequest struct {

View file

@ -207,8 +207,31 @@ func WalletRequestProcessor() {
case n := <-handleNtfn: case n := <-handleNtfn:
if f, ok := notificationHandlers[n.Method()]; ok { if f, ok := notificationHandlers[n.Method()]; ok {
AcctMgr.Grab() AcctMgr.Grab()
f(n) err := f(n)
AcctMgr.Release() AcctMgr.Release()
switch err {
case nil:
// ignore
case tx.ErrInconsistantStore:
// Likely due to a mis-ordered btcd notification.
// To recover, close server connection and reopen
// all accounts from their last good state saved
// to disk. This will trigger the handshake on
// next connect, and a rescan of one or two blocks
// to catch up rather than throwing away all tx
// history and rescanning everything.
s := CurrentServerConn()
if btcd, ok := s.(*BtcdRPCConn); ok {
AcctMgr.Grab()
btcd.Close()
AcctMgr.OpenAccounts()
AcctMgr.Release()
}
default: // other non-nil
log.Warn(err)
}
} }
} }
} }
@ -1341,7 +1364,11 @@ func SendBeforeReceiveHistorySync(add, done, remove chan btcwire.ShaHash,
func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo *CreatedTx) (interface{}, *btcjson.Error) { func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo *CreatedTx) (interface{}, *btcjson.Error) {
// Add to transaction store. // Add to transaction store.
stx := a.TxStore.InsertSignedTx(txInfo.tx, nil) stx, err := a.TxStore.InsertSignedTx(txInfo.tx, nil)
if err != nil {
log.Warnf("Error adding sent tx history: %v", err)
return nil, &btcjson.ErrInternal
}
AcctMgr.ds.ScheduleTxStoreWrite(a) AcctMgr.ds.ScheduleTxStoreWrite(a)
// Notify frontends of new SendTx. // Notify frontends of new SendTx.

View file

@ -42,6 +42,10 @@ var (
// object is marked with a version that is no longer supported // object is marked with a version that is no longer supported
// during deserialization. // during deserialization.
ErrUnsupportedVersion = errors.New("version no longer supported") ErrUnsupportedVersion = errors.New("version no longer supported")
// ErrInconsistantStore represents an error for when an inconsistancy
// is detected during inserting or returning transaction records.
ErrInconsistantStore = errors.New("inconsistant transaction store")
) )
// Record is a common interface shared by SignedTx and RecvTxOut transaction // Record is a common interface shared by SignedTx and RecvTxOut transaction
@ -264,7 +268,7 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
// It is an error for the backing transaction to have // It is an error for the backing transaction to have
// not already been read. // not already been read.
if _, ok := s.txs[rtx.blockTx()]; !ok { if _, ok := s.txs[rtx.blockTx()]; !ok {
return n64, errors.New("missing backing transaction") return n64, ErrInconsistantStore
} }
// Add entries to store. // Add entries to store.
@ -290,7 +294,7 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
// It is an error for the backing transaction to have // It is an error for the backing transaction to have
// not already been read. // not already been read.
if _, ok := s.txs[stx.blockTx()]; !ok { if _, ok := s.txs[stx.blockTx()]; !ok {
return n64, errors.New("missing backing transaction") return n64, ErrInconsistantStore
} }
// Add entries to store. // Add entries to store.
@ -371,7 +375,7 @@ func (s *Store) WriteTo(w io.Writer) (int64, error) {
// store, returning the record. Duplicates and double spend correction is // store, returning the record. Duplicates and double spend correction is
// handled automatically. Transactions may be added without block details, // handled automatically. Transactions may be added without block details,
// and later added again with block details once the tx has been mined. // and later added again with block details once the tx has been mined.
func (s *Store) InsertSignedTx(tx *btcutil.Tx, block *BlockDetails) *SignedTx { func (s *Store) InsertSignedTx(tx *btcutil.Tx, block *BlockDetails) (*SignedTx, error) {
var created time.Time var created time.Time
if block == nil { if block == nil {
created = time.Now() created = time.Now()
@ -387,8 +391,11 @@ func (s *Store) InsertSignedTx(tx *btcutil.Tx, block *BlockDetails) *SignedTx {
block: block, block: block,
} }
s.insertTx(tx, st) err := s.insertTx(tx, st)
return st.record(s).(*SignedTx) if err != nil {
return nil, ErrInconsistantStore
}
return st.record(s).(*SignedTx), nil
} }
// Rollback removes block details for all transactions at or beyond a // Rollback removes block details for all transactions at or beyond a
@ -398,33 +405,36 @@ func (s *Store) InsertSignedTx(tx *btcutil.Tx, block *BlockDetails) *SignedTx {
// chain are added to the store. // chain are added to the store.
func (s *Store) Rollback(height int32) { func (s *Store) Rollback(height int32) {
for e := s.sorted.Front(); e != nil; e = e.Next() { for e := s.sorted.Front(); e != nil; e = e.Next() {
tx := e.Value.(txRecord) record := e.Value.(txRecord)
if details := tx.Block(); details != nil { block := record.Block()
txSha := tx.TxSha() if block == nil {
oldKey := blockTx{*txSha, details.Height} // Unmined, no block details to remove.
if details.Height >= height { continue
tx.setBlock(nil) }
txSha := record.TxSha()
if block.Height >= height {
oldKey := blockTx{*txSha, block.Height}
record.setBlock(nil)
switch v := tx.(type) { switch v := record.(type) {
case *signedTx: case *signedTx:
k := oldKey k := oldKey
delete(s.signed, k) delete(s.signed, k)
k.height = -1 k.height = -1
s.signed[k] = v s.signed[k] = v
case *recvTxOut: case *recvTxOut:
k := blockOutPoint{v.outpoint, details.Height} k := blockOutPoint{v.outpoint, block.Height}
delete(s.recv, k) delete(s.recv, k)
k.height = -1 k.height = -1
s.recv[k] = v s.recv[k] = v
} }
if utx, ok := s.txs[oldKey]; ok { if utx, ok := s.txs[oldKey]; ok {
k := oldKey k := oldKey
delete(s.txs, k) delete(s.txs, k)
k.height = -1 k.height = -1
s.txs[k] = utx s.txs[k] = utx
}
} }
} }
} }
@ -449,7 +459,7 @@ func (s *Store) UnminedSignedTxs() []*btcutil.Tx {
// with non-nil BlockDetails to update the record and all other records // with non-nil BlockDetails to update the record and all other records
// using the transaction with the block. // using the transaction with the block.
func (s *Store) InsertRecvTxOut(tx *btcutil.Tx, outIdx uint32, func (s *Store) InsertRecvTxOut(tx *btcutil.Tx, outIdx uint32,
change bool, received time.Time, block *BlockDetails) *RecvTxOut { change bool, received time.Time, block *BlockDetails) (*RecvTxOut, error) {
rt := &recvTxOut{ rt := &recvTxOut{
outpoint: *btcwire.NewOutPoint(tx.Sha(), outIdx), outpoint: *btcwire.NewOutPoint(tx.Sha(), outIdx),
@ -457,17 +467,27 @@ func (s *Store) InsertRecvTxOut(tx *btcutil.Tx, outIdx uint32,
received: received, received: received,
block: block, block: block,
} }
s.insertTx(tx, rt) err := s.insertTx(tx, rt)
return rt.record(s).(*RecvTxOut) if err != nil {
return nil, err
}
return rt.record(s).(*RecvTxOut), nil
} }
func (s *Store) insertTx(utx *btcutil.Tx, record txRecord) { func (s *Store) insertTx(utx *btcutil.Tx, record txRecord) error {
if ds := s.findDoubleSpend(utx); ds != nil { if ds := s.findDoubleSpend(utx); ds != nil {
switch { switch {
case ds.txSha == *utx.Sha(): // identical tx case ds.txSha == *utx.Sha(): // identical tx
if ds.height != record.Height() { if ds.height != record.Height() {
s.setTxBlock(utx.Sha(), record.Block()) // Detect insert inconsistancies. If matching
return // tx was found, but this record's block is unset,
// a rollback was missed.
block := record.Block()
if block == nil {
return ErrInconsistantStore
}
s.setTxBlock(utx.Sha(), block)
return nil
} }
default: default:
@ -479,6 +499,7 @@ func (s *Store) insertTx(utx *btcutil.Tx, record txRecord) {
} }
s.insertUniqueTx(utx, record) s.insertUniqueTx(utx, record)
return nil
} }
func (s *Store) insertUniqueTx(utx *btcutil.Tx, record txRecord) { func (s *Store) insertUniqueTx(utx *btcutil.Tx, record txRecord) {
@ -605,11 +626,6 @@ func (s *Store) removeDoubleSpends(oldKey *blockTx) {
} }
func (s *Store) setTxBlock(txSha *btcwire.ShaHash, block *BlockDetails) { func (s *Store) setTxBlock(txSha *btcwire.ShaHash, block *BlockDetails) {
if block == nil {
// Nothing to update.
return
}
// Lookup unmined backing tx. // Lookup unmined backing tx.
prevKey := blockTx{*txSha, -1} prevKey := blockTx{*txSha, -1}
tx := s.txs[prevKey] tx := s.txs[prevKey]

View file

@ -78,16 +78,18 @@ func TestTxStore(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
f func(*Store) *Store f func(*Store) (*Store, error)
err error
bal, unc int64 bal, unc int64
unspents map[btcwire.OutPoint]struct{} unspents map[btcwire.OutPoint]struct{}
unmined map[btcwire.ShaHash]struct{} unmined map[btcwire.ShaHash]struct{}
}{ }{
{ {
name: "new store", name: "new store",
f: func(_ *Store) *Store { f: func(_ *Store) (*Store, error) {
return NewStore() return NewStore(), nil
}, },
err: nil,
bal: 0, bal: 0,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{}, unspents: map[btcwire.OutPoint]struct{}{},
@ -95,10 +97,36 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "txout insert", name: "txout insert",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstRecvTx, 0, false, time.Now(), nil) r, err := s.InsertRecvTxOut(TstRecvTx, 0, false, time.Now(), nil)
return s if err != nil {
return nil, err
}
// If the above succeeded, try using the record. This will
// dereference the tx and panic if the above didn't catch
// an inconsistant insert.
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: 0,
unc: TstRecvTx.MsgTx().TxOut[0].Value,
unspents: map[btcwire.OutPoint]struct{}{
*btcwire.NewOutPoint(TstRecvTx.Sha(), 0): struct{}{},
},
unmined: map[btcwire.ShaHash]struct{}{},
},
{
name: "insert duplicate unconfirmed",
f: func(s *Store) (*Store, error) {
r, err := s.InsertRecvTxOut(TstRecvTx, 0, false, time.Now(), nil)
if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
},
err: nil,
bal: 0, bal: 0,
unc: TstRecvTx.MsgTx().TxOut[0].Value, unc: TstRecvTx.MsgTx().TxOut[0].Value,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -108,10 +136,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "confirmed txout insert", name: "confirmed txout insert",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstRecvTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails) r, err := s.InsertRecvTxOut(TstRecvTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: TstRecvTx.MsgTx().TxOut[0].Value, bal: TstRecvTx.MsgTx().TxOut[0].Value,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -121,10 +154,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "insert duplicate confirmed", name: "insert duplicate confirmed",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstRecvTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails) r, err := s.InsertRecvTxOut(TstRecvTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: TstRecvTx.MsgTx().TxOut[0].Value, bal: TstRecvTx.MsgTx().TxOut[0].Value,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -134,23 +172,27 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "insert duplicate unconfirmed", name: "insert duplicate unconfirmed",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstRecvTx, 0, false, time.Now(), nil) r, err := s.InsertRecvTxOut(TstRecvTx, 0, false, time.Now(), nil)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
bal: TstRecvTx.MsgTx().TxOut[0].Value, err: ErrInconsistantStore,
unc: 0,
unspents: map[btcwire.OutPoint]struct{}{
*btcwire.NewOutPoint(TstRecvTx.Sha(), 0): struct{}{},
},
unmined: map[btcwire.ShaHash]struct{}{},
}, },
{ {
name: "insert double spend with new txout value", name: "insert double spend with new txout value",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstDoubleSpendTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails) r, err := s.InsertRecvTxOut(TstDoubleSpendTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: TstDoubleSpendTx.MsgTx().TxOut[0].Value, bal: TstDoubleSpendTx.MsgTx().TxOut[0].Value,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -160,10 +202,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "insert unconfirmed signed tx", name: "insert unconfirmed signed tx",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertSignedTx(TstSpendingTx, nil) r, err := s.InsertSignedTx(TstSpendingTx, nil)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: 0, bal: 0,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{}, unspents: map[btcwire.OutPoint]struct{}{},
@ -173,10 +220,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "insert unconfirmed signed tx again", name: "insert unconfirmed signed tx again",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertSignedTx(TstSpendingTx, nil) r, err := s.InsertSignedTx(TstSpendingTx, nil)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: 0, bal: 0,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{}, unspents: map[btcwire.OutPoint]struct{}{},
@ -186,10 +238,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "insert change (index 0)", name: "insert change (index 0)",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstSpendingTx, 0, true, time.Now(), nil) r, err := s.InsertRecvTxOut(TstSpendingTx, 0, true, time.Now(), nil)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: 0, bal: 0,
unc: TstSpendingTx.MsgTx().TxOut[0].Value, unc: TstSpendingTx.MsgTx().TxOut[0].Value,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -201,10 +258,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "insert output back to this own wallet (index 1)", name: "insert output back to this own wallet (index 1)",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstSpendingTx, 1, true, time.Now(), nil) r, err := s.InsertRecvTxOut(TstSpendingTx, 1, true, time.Now(), nil)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: 0, bal: 0,
unc: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value, unc: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -217,10 +279,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "confirmed signed tx", name: "confirmed signed tx",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertSignedTx(TstSpendingTx, TstSignedTxBlockDetails) r, err := s.InsertSignedTx(TstSpendingTx, TstSignedTxBlockDetails)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value, bal: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -231,10 +298,11 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "rollback after spending tx", name: "rollback after spending tx",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.Rollback(TstSignedTxBlockDetails.Height + 1) s.Rollback(TstSignedTxBlockDetails.Height + 1)
return s return s, nil
}, },
err: nil,
bal: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value, bal: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -245,10 +313,11 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "rollback spending tx block", name: "rollback spending tx block",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.Rollback(TstSignedTxBlockDetails.Height) s.Rollback(TstSignedTxBlockDetails.Height)
return s return s, nil
}, },
err: nil,
bal: 0, bal: 0,
unc: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value, unc: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -261,10 +330,11 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "rollback double spend tx block", name: "rollback double spend tx block",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.Rollback(TstRecvTxBlockDetails.Height) s.Rollback(TstRecvTxBlockDetails.Height)
return s return s, nil
}, },
err: nil,
bal: 0, bal: 0,
unc: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value, unc: TstSpendingTx.MsgTx().TxOut[0].Value + TstSpendingTx.MsgTx().TxOut[1].Value,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -277,10 +347,15 @@ func TestTxStore(t *testing.T) {
}, },
{ {
name: "insert original recv txout", name: "insert original recv txout",
f: func(s *Store) *Store { f: func(s *Store) (*Store, error) {
s.InsertRecvTxOut(TstRecvTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails) r, err := s.InsertRecvTxOut(TstRecvTx, 0, false, TstRecvTxBlockDetails.Time, TstRecvTxBlockDetails)
return s if err != nil {
return nil, err
}
_ = r.TxInfo("", 100, btcwire.MainNet)
return s, nil
}, },
err: nil,
bal: TstRecvTx.MsgTx().TxOut[0].Value, bal: TstRecvTx.MsgTx().TxOut[0].Value,
unc: 0, unc: 0,
unspents: map[btcwire.OutPoint]struct{}{ unspents: map[btcwire.OutPoint]struct{}{
@ -292,10 +367,17 @@ func TestTxStore(t *testing.T) {
var s *Store var s *Store
for _, test := range tests { for _, test := range tests {
s = test.f(s) tmpStore, err := test.f(s)
if err != test.err {
t.Fatalf("%s: error mismatch: expected: %v, got: %v", test.name, test.err, err)
}
if test.err != nil {
continue
}
s = tmpStore
bal := s.Balance(1, TstRecvCurrentHeight) bal := s.Balance(1, TstRecvCurrentHeight)
if bal != test.bal { if bal != test.bal {
t.Errorf("%s: balance mismatch: expected %d, got %d", test.name, test.bal, bal) t.Errorf("%s: balance mismatch: expected: %d, got: %d", test.name, test.bal, bal)
} }
unc := s.Balance(0, TstRecvCurrentHeight) - bal unc := s.Balance(0, TstRecvCurrentHeight) - bal
if unc != test.unc { if unc != test.unc {