From 632148ed55d770f863c966e4dd9c7037daa1b1c6 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 18 Jun 2014 16:46:18 -0500 Subject: [PATCH] Fix various issues found by profiling. This commit is the result of inspecting the results of both cpu and memory profiling, to improve areas where wallet can be more efficient on transaction inserts. One problem that's very evident by profiling is how much waiting there is for file (txstore, wallet) writes. This commit does not attempt to fix this yet, but focuses on the easier-to-fix memory allocation issues which can slow down the rest of wallet due to excessive garbage collection scanning. While here, fix a race where a closure run as a goroutine was closing over a range iterator. --- cmd.go | 5 -- rpcclient.go | 70 +++++++++------------ rpcserver.go | 48 +------------- txstore/serialization.go | 6 +- txstore/tx.go | 131 +++++++++++++++++---------------------- 5 files changed, 88 insertions(+), 172 deletions(-) diff --git a/cmd.go b/cmd.go index e1bd652..4515cfe 100644 --- a/cmd.go +++ b/cmd.go @@ -181,11 +181,6 @@ func main() { // Start HTTP server to serve wallet client connections. server.Start() - // Begin maintanence goroutines. - go StoreNotifiedMempoolRecvTxs(NotifiedRecvTxChans.add, - NotifiedRecvTxChans.remove, - NotifiedRecvTxChans.access) - // Start client connection to a btcd chain server. Attempt // reconnections if the client could not be successfully connected. clientChan := make(chan *rpcClient) diff --git a/rpcclient.go b/rpcclient.go index 6c9129c..d54f7f0 100644 --- a/rpcclient.go +++ b/rpcclient.go @@ -185,10 +185,12 @@ func (n recvTx) handleNotification() error { AcctMgr.Grab() defer AcctMgr.Release() - // For every output, find all accounts handling that output address (if any) - // and record the received txout. - for outIdx, txout := range n.tx.MsgTx().TxOut { - var accounts []*Account + // For every output, if it pays to a wallet address, insert the + // transaction into the store (possibly moving it from unconfirmed to + // confirmed), and add a credit record if one does not already exist. + var txr *txstore.TxRecord + txInserted := false + for i, txout := range n.tx.MsgTx().TxOut { // Errors don't matter here. If addrs is nil, the range below // does nothing. _, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript, @@ -196,53 +198,37 @@ func (n recvTx) handleNotification() error { for _, addr := range addrs { a, err := AcctMgr.AccountByAddress(addr) if err != nil { - continue + continue // try next address, if any } - accounts = append(accounts, a) - } - for _, a := range accounts { - txr, err := a.TxStore.InsertTx(n.tx, block) - if err != nil { - return err + if !txInserted { + txr, err = a.TxStore.InsertTx(n.tx, block) + if err != nil { + return err + } + txInserted = true } - cred, err := txr.AddCredit(uint32(outIdx), false) + + // Insert and notify websocket clients of the credit if it is + // not a duplicate, otherwise, check the next txout if the + // credit has already been inserted. + if txr.HasCredit(i) { + break + } + cred, err := txr.AddCredit(uint32(i), false) if err != nil { return err } AcctMgr.ds.ScheduleTxStoreWrite(a) - - // Notify wallet clients of tx. If the tx is unconfirmed, it is always - // notified and the outpoint is marked as notified. If the outpoint - // has already been notified and is now in a block, a txmined notifiction - // should be sent once to let wallet clients that all previous send/recvs - // for this unconfirmed tx are now confirmed. - op := *cred.OutPoint() - previouslyNotifiedReq := NotifiedRecvTxRequest{ - op: op, - response: make(chan NotifiedRecvTxResponse), + ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net()) + if err != nil { + return err } - NotifiedRecvTxChans.access <- previouslyNotifiedReq - if <-previouslyNotifiedReq.response { - NotifiedRecvTxChans.remove <- op - } else { - // Notify clients of new recv tx and mark as notified. - NotifiedRecvTxChans.add <- op - - ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net()) - if err != nil { - return err - } - server.NotifyNewTxDetails(a.Name(), ltr) - } - - // Notify clients of new account balance. - confirmed := a.CalculateBalance(1) - unconfirmed := a.CalculateBalance(0) - confirmed - server.NotifyWalletBalance(a.name, confirmed) - server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed) + server.NotifyNewTxDetails(a.Name(), ltr) + break // check whether next txout is a wallet txout } } + server.NotifyBalances() return nil } @@ -462,7 +448,7 @@ func (c *rpcClient) Handshake() error { } if server != nil { server.NotifyNewBlockChainHeight(&bs) - server.NotifyBalances(nil) + server.NotifyBalances() } // Get default account. Only the default account is used to diff --git a/rpcserver.go b/rpcserver.go index 9e43531..b8c446b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1369,7 +1369,7 @@ func (s *rpcServer) NotifyNewBlockChainHeight(bs *wallet.BlockStamp) { // TODO(jrick): Switch this to return a single JSON object // (map[string]interface{}) of all accounts and their balances, instead of // separate notifications for each account. -func (s *rpcServer) NotifyBalances(wsc *websocketClient) { +func (s *rpcServer) NotifyBalances() { for _, a := range AcctMgr.AllAccounts() { balance := a.CalculateBalance(1) unconfirmed := a.CalculateBalance(0) - balance @@ -2687,49 +2687,3 @@ func (s *rpcServer) NotifyNewTxDetails(account string, details btcjson.ListTrans } s.broadcasts <- mntfn } - -// NotifiedRecvTxRequest is used to check whether the outpoint of -// a received transaction has already been notified due to -// arriving first in the btcd mempool. -type NotifiedRecvTxRequest struct { - op btcwire.OutPoint - response chan NotifiedRecvTxResponse -} - -// NotifiedRecvTxResponse is the response of a NotifiedRecvTxRequest -// request. -type NotifiedRecvTxResponse bool - -// NotifiedRecvTxChans holds the channels to manage -// StoreNotifiedMempoolTxs. -var NotifiedRecvTxChans = struct { - add, remove chan btcwire.OutPoint - access chan NotifiedRecvTxRequest -}{ - add: make(chan btcwire.OutPoint), - remove: make(chan btcwire.OutPoint), - access: make(chan NotifiedRecvTxRequest), -} - -// StoreNotifiedMempoolRecvTxs maintains a set of previously-sent -// received transaction notifications originating from the btcd -// mempool. This is used to prevent duplicate client transaction -// notifications once a mempool tx is mined into a block. -func StoreNotifiedMempoolRecvTxs(add, remove chan btcwire.OutPoint, - access chan NotifiedRecvTxRequest) { - - m := make(map[btcwire.OutPoint]struct{}) - for { - select { - case op := <-add: - m[op] = struct{}{} - - case op := <-remove: - delete(m, op) - - case req := <-access: - _, ok := m[req.op] - req.response <- NotifiedRecvTxResponse(ok) - } - } -} diff --git a/txstore/serialization.go b/txstore/serialization.go index 3935a90..3a4b842 100644 --- a/txstore/serialization.go +++ b/txstore/serialization.go @@ -447,12 +447,12 @@ func (t *txRecord) ReadFrom(r io.Reader) (int64, error) { // For each expected output key, allocate and read the key, // appending the result to the spends slice. This slice is - // originally set to nil (*not* preallocated to spendsCount + // originally set empty (*not* preallocated to spendsCount // size) to prevent accidentally allocating so much memory that // the process dies. - var spends []*BlockOutputKey + var spends []BlockOutputKey for i := uint32(0); i < spendsCount; i++ { - k := &BlockOutputKey{} + k := BlockOutputKey{} tmpn64, err := k.ReadFrom(r) n64 += tmpn64 if err != nil { diff --git a/txstore/tx.go b/txstore/tx.go index 00b7549..5c547bf 100644 --- a/txstore/tx.go +++ b/txstore/tx.go @@ -244,7 +244,7 @@ type txRecord struct { // transaction credits. type debits struct { amount btcutil.Amount - spends []*BlockOutputKey + spends []BlockOutputKey } // credit describes a transaction output which was or is spendable by wallet. @@ -404,37 +404,6 @@ func (u *unconfirmedStore) txRecordForInserts(tx *btcutil.Tx) *txRecord { return r } -func (r *txRecord) setDebitsSpends(spends []*BlockOutputKey, tx *btcutil.Tx) error { - if r.debits.spends != nil { - if *r.tx.Sha() == *tx.Sha() { - return ErrDuplicateInsert - } - return ErrInconsistentStore - } - r.debits.spends = spends - return nil -} - -func (r *txRecord) setCredit(c *credit, index uint32, tx *btcutil.Tx) error { - if len(r.credits) <= int(index) { - r.credits = extendCredits(r.credits, index) - } - if r.credits[index] != nil { - if *r.tx.Sha() == *tx.Sha() { - return ErrDuplicateInsert - } - return ErrInconsistentStore - } - - r.credits[index] = c - return nil -} - -func extendCredits(c []*credit, index uint32) []*credit { - missing := make([]*credit, int(index+1)-len(c)) - return append(c, missing...) -} - func (s *Store) moveMinedTx(r *txRecord, block *Block) error { delete(s.unconfirmed.txs, *r.Tx().Sha()) @@ -465,12 +434,9 @@ func (s *Store) moveMinedTx(r *txRecord, block *Block) error { if err != nil { return err } - if len(rr.credits) <= int(prev.OutputIndex) { - rr.credits = extendCredits(rr.credits, prev.OutputIndex) - } rr.credits[prev.OutputIndex].spentBy = &key // debits should already be non-nil - r.debits.spends = append(r.debits.spends, &prev) + r.debits.spends = append(r.debits.spends, prev) } // For each credit in r, if the credit is spent by another unconfirmed @@ -483,20 +449,21 @@ func (s *Store) moveMinedTx(r *txRecord, block *Block) error { // If the credit is not spent, modify the store's unspent bookkeeping // maps to include the credit and increment the amount deltas by the // credit's value. + op := btcwire.OutPoint{Hash: *r.Tx().Sha()} for i, credit := range r.credits { if credit == nil { continue } - op := btcwire.NewOutPoint(r.Tx().Sha(), uint32(i)) - outputKey := BlockOutputKey{key, uint32(i)} - if rr, ok := s.unconfirmed.spentUnconfirmed[*op]; ok { - delete(s.unconfirmed.spentUnconfirmed, *op) - s.unconfirmed.spentBlockOutPointKeys[*op] = outputKey + op.Index = uint32(i) + outputKey := BlockOutputKey{key, op.Index} + if rr, ok := s.unconfirmed.spentUnconfirmed[op]; ok { + delete(s.unconfirmed.spentUnconfirmed, op) + s.unconfirmed.spentBlockOutPointKeys[op] = outputKey s.unconfirmed.spentBlockOutPoints[outputKey] = rr credit.spentBy = &BlockTxKey{BlockHeight: -1} } else if credit.spentBy == nil { // Mark outpoint unspent. - s.unspent[*op] = key + s.unspent[op] = key // Increment spendable amount delta as a result of // moving this credit to this block. @@ -541,10 +508,9 @@ func (s *Store) InsertTx(tx *btcutil.Tx, block *Block) (*TxRecord, error) { // Simply create or return the transaction record if this transaction // is unconfirmed. if block == nil { - key := BlockTxKey{BlockHeight: -1} r := s.unconfirmed.txRecordForInserts(tx) r.received = received - return &TxRecord{key, r, s}, nil + return &TxRecord{BlockTxKey{BlockHeight: -1}, r, s}, nil } // Check if block records already exist for this tx. If so, @@ -641,24 +607,18 @@ func (t *TxRecord) AddDebits(spent []Credit) (Debits, error) { t.s.unconfirmed.spentUnconfirmed[*op] = t.txRecord default: key := c.outputKey() - t.s.unconfirmed.spentBlockOutPointKeys[*op] = *key - t.s.unconfirmed.spentBlockOutPoints[*key] = t.txRecord + t.s.unconfirmed.spentBlockOutPointKeys[*op] = key + t.s.unconfirmed.spentBlockOutPoints[key] = t.txRecord } } default: if t.debits.spends == nil { - prevOutputKeys := make([]*BlockOutputKey, 0, len(spent)) - for _, c := range spent { - prevOutputKeys = append(prevOutputKeys, c.outputKey()) - } - err := t.txRecord.setDebitsSpends(prevOutputKeys, t.tx) - if err != nil { - if err == ErrDuplicateInsert { - return Debits{t}, nil - } - return Debits{}, err + prevOutputKeys := make([]BlockOutputKey, len(spent)) + for i, c := range spent { + prevOutputKeys[i] = c.outputKey() } + t.txRecord.debits.spends = prevOutputKeys } } return Debits{t}, nil @@ -728,10 +688,9 @@ func (s *Store) markOutputsSpent(spent []Credit, t *TxRecord) (btcutil.Amount, e credit.spentBy = &t.BlockTxKey delete(s.unspent, *op) if t.BlockHeight == -1 { // unconfirmed - op := prev.OutPoint() key := prev.outputKey() - s.unconfirmed.spentBlockOutPointKeys[*op] = *key - s.unconfirmed.spentBlockOutPoints[*key] = t.txRecord + s.unconfirmed.spentBlockOutPointKeys[*op] = key + s.unconfirmed.spentBlockOutPoints[key] = t.txRecord } // Increment total debited amount. @@ -752,6 +711,23 @@ func (s *Store) markOutputsSpent(spent []Credit, t *TxRecord) (btcutil.Amount, e return a, nil } +func (r *txRecord) setCredit(index uint32, change bool, tx *btcutil.Tx) error { + if r.credits == nil { + r.credits = make([]*credit, 0, len(tx.MsgTx().TxOut)) + } + for i := uint32(len(r.credits)); i <= index; i++ { + r.credits = append(r.credits, nil) + } + if r.credits[index] != nil { + if *r.tx.Sha() == *tx.Sha() { + return ErrDuplicateInsert + } + return ErrInconsistentStore + } + r.credits[index] = &credit{change: change} + return nil +} + // AddCredit marks the transaction record as containing a transaction output // spendable by wallet. The output is added unspent, and is marked spent // when a new transaction spending the output is inserted into the store. @@ -760,8 +736,7 @@ func (t *TxRecord) AddCredit(index uint32, change bool) (Credit, error) { return Credit{}, errors.New("transaction output does not exist") } - c := &credit{change: change} - if err := t.txRecord.setCredit(c, index, t.tx); err != nil { + if err := t.txRecord.setCredit(index, change, t.tx); err != nil { if err == ErrDuplicateInsert { return Credit{t, index}, nil } @@ -873,7 +848,7 @@ func (s *Store) Rollback(height int32) error { current := BlockOutputKey{ BlockTxKey: BlockTxKey{BlockHeight: -1}, } - err = spender.swapDebits(&prev, ¤t) + err = spender.swapDebits(prev, current) if err != nil { return err } @@ -890,7 +865,7 @@ func (s *Store) Rollback(height int32) error { if err != nil { return err } - c, err := rr.lookupBlockCredit(*prev) + c, err := rr.lookupBlockCredit(prev) if err != nil { return err } @@ -898,8 +873,8 @@ func (s *Store) Rollback(height int32) error { Hash: *rr.Tx().Sha(), Index: prev.OutputIndex, } - s.unconfirmed.spentBlockOutPointKeys[op] = *prev - s.unconfirmed.spentBlockOutPoints[*prev] = r + s.unconfirmed.spentBlockOutPointKeys[op] = prev + s.unconfirmed.spentBlockOutPoints[prev] = r c.spentBy = &BlockTxKey{BlockHeight: -1} } @@ -913,18 +888,15 @@ func (s *Store) Rollback(height int32) error { return nil } -func (r *txRecord) swapDebits(previous, current *BlockOutputKey) error { +func (r *txRecord) swapDebits(previous, current BlockOutputKey) error { for i, outputKey := range r.debits.spends { - if outputKey == nil { - continue - } - if *outputKey == *previous { + if outputKey == previous { r.debits.spends[i] = current return nil } } - return MissingCreditError(*previous) + return MissingCreditError(previous) } // UnminedDebitTxs returns the underlying transactions for all wallet @@ -1034,7 +1006,7 @@ func (s *Store) UnspentOutputs() ([]Credit, error) { i := 0 for op, key := range s.unspent { creditChans[i] = make(chan createdCredit) - go func(i int, opIndex uint32) { + go func(i int, key BlockTxKey, opIndex uint32) { r, err := s.lookupBlockTx(key) if err != nil { creditChans[i] <- createdCredit{err: err} @@ -1043,7 +1015,7 @@ func (s *Store) UnspentOutputs() ([]Credit, error) { t := &TxRecord{key, r, s} c := Credit{t, opIndex} creditChans[i] <- createdCredit{credit: c} - }(i, op.Index) + }(i, key, op.Index) i++ } @@ -1276,6 +1248,15 @@ func (t *TxRecord) Credits() []Credit { return credits } +// HasCredit returns whether the transaction output at the passed index is +// a wallet credit. +func (t *TxRecord) HasCredit(i int) bool { + if len(t.credits) <= i { + return false + } + return t.credits[i] != nil +} + // InputAmount returns the total amount debited from previous credits. func (d Debits) InputAmount() btcutil.Amount { return d.txRecord.debits.amount @@ -1347,8 +1328,8 @@ func (c Credit) OutPoint() *btcwire.OutPoint { } // outputKey creates and returns the block lookup key for this credit. -func (c Credit) outputKey() *BlockOutputKey { - return &BlockOutputKey{ +func (c Credit) outputKey() BlockOutputKey { + return BlockOutputKey{ BlockTxKey: c.BlockTxKey, OutputIndex: c.OutputIndex, }