diff --git a/cmd.go b/cmd.go index e1bd652..4515cfe 100644 --- a/cmd.go +++ b/cmd.go @@ -181,11 +181,6 @@ func main() { // Start HTTP server to serve wallet client connections. server.Start() - // Begin maintanence goroutines. - go StoreNotifiedMempoolRecvTxs(NotifiedRecvTxChans.add, - NotifiedRecvTxChans.remove, - NotifiedRecvTxChans.access) - // Start client connection to a btcd chain server. Attempt // reconnections if the client could not be successfully connected. clientChan := make(chan *rpcClient) diff --git a/rpcclient.go b/rpcclient.go index 6c9129c..d54f7f0 100644 --- a/rpcclient.go +++ b/rpcclient.go @@ -185,10 +185,12 @@ func (n recvTx) handleNotification() error { AcctMgr.Grab() defer AcctMgr.Release() - // For every output, find all accounts handling that output address (if any) - // and record the received txout. - for outIdx, txout := range n.tx.MsgTx().TxOut { - var accounts []*Account + // For every output, if it pays to a wallet address, insert the + // transaction into the store (possibly moving it from unconfirmed to + // confirmed), and add a credit record if one does not already exist. + var txr *txstore.TxRecord + txInserted := false + for i, txout := range n.tx.MsgTx().TxOut { // Errors don't matter here. If addrs is nil, the range below // does nothing. _, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript, @@ -196,53 +198,37 @@ func (n recvTx) handleNotification() error { for _, addr := range addrs { a, err := AcctMgr.AccountByAddress(addr) if err != nil { - continue + continue // try next address, if any } - accounts = append(accounts, a) - } - for _, a := range accounts { - txr, err := a.TxStore.InsertTx(n.tx, block) - if err != nil { - return err + if !txInserted { + txr, err = a.TxStore.InsertTx(n.tx, block) + if err != nil { + return err + } + txInserted = true } - cred, err := txr.AddCredit(uint32(outIdx), false) + + // Insert and notify websocket clients of the credit if it is + // not a duplicate, otherwise, check the next txout if the + // credit has already been inserted. + if txr.HasCredit(i) { + break + } + cred, err := txr.AddCredit(uint32(i), false) if err != nil { return err } AcctMgr.ds.ScheduleTxStoreWrite(a) - - // Notify wallet clients of tx. If the tx is unconfirmed, it is always - // notified and the outpoint is marked as notified. If the outpoint - // has already been notified and is now in a block, a txmined notifiction - // should be sent once to let wallet clients that all previous send/recvs - // for this unconfirmed tx are now confirmed. - op := *cred.OutPoint() - previouslyNotifiedReq := NotifiedRecvTxRequest{ - op: op, - response: make(chan NotifiedRecvTxResponse), + ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net()) + if err != nil { + return err } - NotifiedRecvTxChans.access <- previouslyNotifiedReq - if <-previouslyNotifiedReq.response { - NotifiedRecvTxChans.remove <- op - } else { - // Notify clients of new recv tx and mark as notified. - NotifiedRecvTxChans.add <- op - - ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net()) - if err != nil { - return err - } - server.NotifyNewTxDetails(a.Name(), ltr) - } - - // Notify clients of new account balance. - confirmed := a.CalculateBalance(1) - unconfirmed := a.CalculateBalance(0) - confirmed - server.NotifyWalletBalance(a.name, confirmed) - server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed) + server.NotifyNewTxDetails(a.Name(), ltr) + break // check whether next txout is a wallet txout } } + server.NotifyBalances() return nil } @@ -462,7 +448,7 @@ func (c *rpcClient) Handshake() error { } if server != nil { server.NotifyNewBlockChainHeight(&bs) - server.NotifyBalances(nil) + server.NotifyBalances() } // Get default account. Only the default account is used to diff --git a/rpcserver.go b/rpcserver.go index 9e43531..b8c446b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1369,7 +1369,7 @@ func (s *rpcServer) NotifyNewBlockChainHeight(bs *wallet.BlockStamp) { // TODO(jrick): Switch this to return a single JSON object // (map[string]interface{}) of all accounts and their balances, instead of // separate notifications for each account. -func (s *rpcServer) NotifyBalances(wsc *websocketClient) { +func (s *rpcServer) NotifyBalances() { for _, a := range AcctMgr.AllAccounts() { balance := a.CalculateBalance(1) unconfirmed := a.CalculateBalance(0) - balance @@ -2687,49 +2687,3 @@ func (s *rpcServer) NotifyNewTxDetails(account string, details btcjson.ListTrans } s.broadcasts <- mntfn } - -// NotifiedRecvTxRequest is used to check whether the outpoint of -// a received transaction has already been notified due to -// arriving first in the btcd mempool. -type NotifiedRecvTxRequest struct { - op btcwire.OutPoint - response chan NotifiedRecvTxResponse -} - -// NotifiedRecvTxResponse is the response of a NotifiedRecvTxRequest -// request. -type NotifiedRecvTxResponse bool - -// NotifiedRecvTxChans holds the channels to manage -// StoreNotifiedMempoolTxs. -var NotifiedRecvTxChans = struct { - add, remove chan btcwire.OutPoint - access chan NotifiedRecvTxRequest -}{ - add: make(chan btcwire.OutPoint), - remove: make(chan btcwire.OutPoint), - access: make(chan NotifiedRecvTxRequest), -} - -// StoreNotifiedMempoolRecvTxs maintains a set of previously-sent -// received transaction notifications originating from the btcd -// mempool. This is used to prevent duplicate client transaction -// notifications once a mempool tx is mined into a block. -func StoreNotifiedMempoolRecvTxs(add, remove chan btcwire.OutPoint, - access chan NotifiedRecvTxRequest) { - - m := make(map[btcwire.OutPoint]struct{}) - for { - select { - case op := <-add: - m[op] = struct{}{} - - case op := <-remove: - delete(m, op) - - case req := <-access: - _, ok := m[req.op] - req.response <- NotifiedRecvTxResponse(ok) - } - } -} diff --git a/txstore/serialization.go b/txstore/serialization.go index 3935a90..3a4b842 100644 --- a/txstore/serialization.go +++ b/txstore/serialization.go @@ -447,12 +447,12 @@ func (t *txRecord) ReadFrom(r io.Reader) (int64, error) { // For each expected output key, allocate and read the key, // appending the result to the spends slice. This slice is - // originally set to nil (*not* preallocated to spendsCount + // originally set empty (*not* preallocated to spendsCount // size) to prevent accidentally allocating so much memory that // the process dies. - var spends []*BlockOutputKey + var spends []BlockOutputKey for i := uint32(0); i < spendsCount; i++ { - k := &BlockOutputKey{} + k := BlockOutputKey{} tmpn64, err := k.ReadFrom(r) n64 += tmpn64 if err != nil { diff --git a/txstore/tx.go b/txstore/tx.go index 00b7549..5c547bf 100644 --- a/txstore/tx.go +++ b/txstore/tx.go @@ -244,7 +244,7 @@ type txRecord struct { // transaction credits. type debits struct { amount btcutil.Amount - spends []*BlockOutputKey + spends []BlockOutputKey } // credit describes a transaction output which was or is spendable by wallet. @@ -404,37 +404,6 @@ func (u *unconfirmedStore) txRecordForInserts(tx *btcutil.Tx) *txRecord { return r } -func (r *txRecord) setDebitsSpends(spends []*BlockOutputKey, tx *btcutil.Tx) error { - if r.debits.spends != nil { - if *r.tx.Sha() == *tx.Sha() { - return ErrDuplicateInsert - } - return ErrInconsistentStore - } - r.debits.spends = spends - return nil -} - -func (r *txRecord) setCredit(c *credit, index uint32, tx *btcutil.Tx) error { - if len(r.credits) <= int(index) { - r.credits = extendCredits(r.credits, index) - } - if r.credits[index] != nil { - if *r.tx.Sha() == *tx.Sha() { - return ErrDuplicateInsert - } - return ErrInconsistentStore - } - - r.credits[index] = c - return nil -} - -func extendCredits(c []*credit, index uint32) []*credit { - missing := make([]*credit, int(index+1)-len(c)) - return append(c, missing...) -} - func (s *Store) moveMinedTx(r *txRecord, block *Block) error { delete(s.unconfirmed.txs, *r.Tx().Sha()) @@ -465,12 +434,9 @@ func (s *Store) moveMinedTx(r *txRecord, block *Block) error { if err != nil { return err } - if len(rr.credits) <= int(prev.OutputIndex) { - rr.credits = extendCredits(rr.credits, prev.OutputIndex) - } rr.credits[prev.OutputIndex].spentBy = &key // debits should already be non-nil - r.debits.spends = append(r.debits.spends, &prev) + r.debits.spends = append(r.debits.spends, prev) } // For each credit in r, if the credit is spent by another unconfirmed @@ -483,20 +449,21 @@ func (s *Store) moveMinedTx(r *txRecord, block *Block) error { // If the credit is not spent, modify the store's unspent bookkeeping // maps to include the credit and increment the amount deltas by the // credit's value. + op := btcwire.OutPoint{Hash: *r.Tx().Sha()} for i, credit := range r.credits { if credit == nil { continue } - op := btcwire.NewOutPoint(r.Tx().Sha(), uint32(i)) - outputKey := BlockOutputKey{key, uint32(i)} - if rr, ok := s.unconfirmed.spentUnconfirmed[*op]; ok { - delete(s.unconfirmed.spentUnconfirmed, *op) - s.unconfirmed.spentBlockOutPointKeys[*op] = outputKey + op.Index = uint32(i) + outputKey := BlockOutputKey{key, op.Index} + if rr, ok := s.unconfirmed.spentUnconfirmed[op]; ok { + delete(s.unconfirmed.spentUnconfirmed, op) + s.unconfirmed.spentBlockOutPointKeys[op] = outputKey s.unconfirmed.spentBlockOutPoints[outputKey] = rr credit.spentBy = &BlockTxKey{BlockHeight: -1} } else if credit.spentBy == nil { // Mark outpoint unspent. - s.unspent[*op] = key + s.unspent[op] = key // Increment spendable amount delta as a result of // moving this credit to this block. @@ -541,10 +508,9 @@ func (s *Store) InsertTx(tx *btcutil.Tx, block *Block) (*TxRecord, error) { // Simply create or return the transaction record if this transaction // is unconfirmed. if block == nil { - key := BlockTxKey{BlockHeight: -1} r := s.unconfirmed.txRecordForInserts(tx) r.received = received - return &TxRecord{key, r, s}, nil + return &TxRecord{BlockTxKey{BlockHeight: -1}, r, s}, nil } // Check if block records already exist for this tx. If so, @@ -641,24 +607,18 @@ func (t *TxRecord) AddDebits(spent []Credit) (Debits, error) { t.s.unconfirmed.spentUnconfirmed[*op] = t.txRecord default: key := c.outputKey() - t.s.unconfirmed.spentBlockOutPointKeys[*op] = *key - t.s.unconfirmed.spentBlockOutPoints[*key] = t.txRecord + t.s.unconfirmed.spentBlockOutPointKeys[*op] = key + t.s.unconfirmed.spentBlockOutPoints[key] = t.txRecord } } default: if t.debits.spends == nil { - prevOutputKeys := make([]*BlockOutputKey, 0, len(spent)) - for _, c := range spent { - prevOutputKeys = append(prevOutputKeys, c.outputKey()) - } - err := t.txRecord.setDebitsSpends(prevOutputKeys, t.tx) - if err != nil { - if err == ErrDuplicateInsert { - return Debits{t}, nil - } - return Debits{}, err + prevOutputKeys := make([]BlockOutputKey, len(spent)) + for i, c := range spent { + prevOutputKeys[i] = c.outputKey() } + t.txRecord.debits.spends = prevOutputKeys } } return Debits{t}, nil @@ -728,10 +688,9 @@ func (s *Store) markOutputsSpent(spent []Credit, t *TxRecord) (btcutil.Amount, e credit.spentBy = &t.BlockTxKey delete(s.unspent, *op) if t.BlockHeight == -1 { // unconfirmed - op := prev.OutPoint() key := prev.outputKey() - s.unconfirmed.spentBlockOutPointKeys[*op] = *key - s.unconfirmed.spentBlockOutPoints[*key] = t.txRecord + s.unconfirmed.spentBlockOutPointKeys[*op] = key + s.unconfirmed.spentBlockOutPoints[key] = t.txRecord } // Increment total debited amount. @@ -752,6 +711,23 @@ func (s *Store) markOutputsSpent(spent []Credit, t *TxRecord) (btcutil.Amount, e return a, nil } +func (r *txRecord) setCredit(index uint32, change bool, tx *btcutil.Tx) error { + if r.credits == nil { + r.credits = make([]*credit, 0, len(tx.MsgTx().TxOut)) + } + for i := uint32(len(r.credits)); i <= index; i++ { + r.credits = append(r.credits, nil) + } + if r.credits[index] != nil { + if *r.tx.Sha() == *tx.Sha() { + return ErrDuplicateInsert + } + return ErrInconsistentStore + } + r.credits[index] = &credit{change: change} + return nil +} + // AddCredit marks the transaction record as containing a transaction output // spendable by wallet. The output is added unspent, and is marked spent // when a new transaction spending the output is inserted into the store. @@ -760,8 +736,7 @@ func (t *TxRecord) AddCredit(index uint32, change bool) (Credit, error) { return Credit{}, errors.New("transaction output does not exist") } - c := &credit{change: change} - if err := t.txRecord.setCredit(c, index, t.tx); err != nil { + if err := t.txRecord.setCredit(index, change, t.tx); err != nil { if err == ErrDuplicateInsert { return Credit{t, index}, nil } @@ -873,7 +848,7 @@ func (s *Store) Rollback(height int32) error { current := BlockOutputKey{ BlockTxKey: BlockTxKey{BlockHeight: -1}, } - err = spender.swapDebits(&prev, ¤t) + err = spender.swapDebits(prev, current) if err != nil { return err } @@ -890,7 +865,7 @@ func (s *Store) Rollback(height int32) error { if err != nil { return err } - c, err := rr.lookupBlockCredit(*prev) + c, err := rr.lookupBlockCredit(prev) if err != nil { return err } @@ -898,8 +873,8 @@ func (s *Store) Rollback(height int32) error { Hash: *rr.Tx().Sha(), Index: prev.OutputIndex, } - s.unconfirmed.spentBlockOutPointKeys[op] = *prev - s.unconfirmed.spentBlockOutPoints[*prev] = r + s.unconfirmed.spentBlockOutPointKeys[op] = prev + s.unconfirmed.spentBlockOutPoints[prev] = r c.spentBy = &BlockTxKey{BlockHeight: -1} } @@ -913,18 +888,15 @@ func (s *Store) Rollback(height int32) error { return nil } -func (r *txRecord) swapDebits(previous, current *BlockOutputKey) error { +func (r *txRecord) swapDebits(previous, current BlockOutputKey) error { for i, outputKey := range r.debits.spends { - if outputKey == nil { - continue - } - if *outputKey == *previous { + if outputKey == previous { r.debits.spends[i] = current return nil } } - return MissingCreditError(*previous) + return MissingCreditError(previous) } // UnminedDebitTxs returns the underlying transactions for all wallet @@ -1034,7 +1006,7 @@ func (s *Store) UnspentOutputs() ([]Credit, error) { i := 0 for op, key := range s.unspent { creditChans[i] = make(chan createdCredit) - go func(i int, opIndex uint32) { + go func(i int, key BlockTxKey, opIndex uint32) { r, err := s.lookupBlockTx(key) if err != nil { creditChans[i] <- createdCredit{err: err} @@ -1043,7 +1015,7 @@ func (s *Store) UnspentOutputs() ([]Credit, error) { t := &TxRecord{key, r, s} c := Credit{t, opIndex} creditChans[i] <- createdCredit{credit: c} - }(i, op.Index) + }(i, key, op.Index) i++ } @@ -1276,6 +1248,15 @@ func (t *TxRecord) Credits() []Credit { return credits } +// HasCredit returns whether the transaction output at the passed index is +// a wallet credit. +func (t *TxRecord) HasCredit(i int) bool { + if len(t.credits) <= i { + return false + } + return t.credits[i] != nil +} + // InputAmount returns the total amount debited from previous credits. func (d Debits) InputAmount() btcutil.Amount { return d.txRecord.debits.amount @@ -1347,8 +1328,8 @@ func (c Credit) OutPoint() *btcwire.OutPoint { } // outputKey creates and returns the block lookup key for this credit. -func (c Credit) outputKey() *BlockOutputKey { - return &BlockOutputKey{ +func (c Credit) outputKey() BlockOutputKey { + return BlockOutputKey{ BlockTxKey: c.BlockTxKey, OutputIndex: c.OutputIndex, }