Fix various issues found by profiling.

This commit is the result of inspecting the results of both cpu and
memory profiling, to improve areas where wallet can be more efficient
on transaction inserts.

One problem that's very evident by profiling is how much waiting there
is for file (txstore, wallet) writes.  This commit does not attempt to
fix this yet, but focuses on the easier-to-fix memory allocation
issues which can slow down the rest of wallet due to excessive garbage
collection scanning.

While here, fix a race where a closure run as a goroutine was closing
over a range iterator.
This commit is contained in:
Josh Rickmar 2014-06-18 16:46:18 -05:00
parent a87f827fb9
commit 632148ed55
5 changed files with 88 additions and 172 deletions

5
cmd.go
View file

@ -181,11 +181,6 @@ func main() {
// Start HTTP server to serve wallet client connections. // Start HTTP server to serve wallet client connections.
server.Start() server.Start()
// Begin maintanence goroutines.
go StoreNotifiedMempoolRecvTxs(NotifiedRecvTxChans.add,
NotifiedRecvTxChans.remove,
NotifiedRecvTxChans.access)
// Start client connection to a btcd chain server. Attempt // Start client connection to a btcd chain server. Attempt
// reconnections if the client could not be successfully connected. // reconnections if the client could not be successfully connected.
clientChan := make(chan *rpcClient) clientChan := make(chan *rpcClient)

View file

@ -185,10 +185,12 @@ func (n recvTx) handleNotification() error {
AcctMgr.Grab() AcctMgr.Grab()
defer AcctMgr.Release() defer AcctMgr.Release()
// For every output, find all accounts handling that output address (if any) // For every output, if it pays to a wallet address, insert the
// and record the received txout. // transaction into the store (possibly moving it from unconfirmed to
for outIdx, txout := range n.tx.MsgTx().TxOut { // confirmed), and add a credit record if one does not already exist.
var accounts []*Account var txr *txstore.TxRecord
txInserted := false
for i, txout := range n.tx.MsgTx().TxOut {
// Errors don't matter here. If addrs is nil, the range below // Errors don't matter here. If addrs is nil, the range below
// does nothing. // does nothing.
_, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript, _, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript,
@ -196,53 +198,37 @@ func (n recvTx) handleNotification() error {
for _, addr := range addrs { for _, addr := range addrs {
a, err := AcctMgr.AccountByAddress(addr) a, err := AcctMgr.AccountByAddress(addr)
if err != nil { if err != nil {
continue continue // try next address, if any
}
accounts = append(accounts, a)
} }
for _, a := range accounts { if !txInserted {
txr, err := a.TxStore.InsertTx(n.tx, block) txr, err = a.TxStore.InsertTx(n.tx, block)
if err != nil { if err != nil {
return err return err
} }
cred, err := txr.AddCredit(uint32(outIdx), false) txInserted = true
}
// Insert and notify websocket clients of the credit if it is
// not a duplicate, otherwise, check the next txout if the
// credit has already been inserted.
if txr.HasCredit(i) {
break
}
cred, err := txr.AddCredit(uint32(i), false)
if err != nil { if err != nil {
return err return err
} }
AcctMgr.ds.ScheduleTxStoreWrite(a) AcctMgr.ds.ScheduleTxStoreWrite(a)
// Notify wallet clients of tx. If the tx is unconfirmed, it is always
// notified and the outpoint is marked as notified. If the outpoint
// has already been notified and is now in a block, a txmined notifiction
// should be sent once to let wallet clients that all previous send/recvs
// for this unconfirmed tx are now confirmed.
op := *cred.OutPoint()
previouslyNotifiedReq := NotifiedRecvTxRequest{
op: op,
response: make(chan NotifiedRecvTxResponse),
}
NotifiedRecvTxChans.access <- previouslyNotifiedReq
if <-previouslyNotifiedReq.response {
NotifiedRecvTxChans.remove <- op
} else {
// Notify clients of new recv tx and mark as notified.
NotifiedRecvTxChans.add <- op
ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net()) ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net())
if err != nil { if err != nil {
return err return err
} }
server.NotifyNewTxDetails(a.Name(), ltr) server.NotifyNewTxDetails(a.Name(), ltr)
} break // check whether next txout is a wallet txout
// Notify clients of new account balance.
confirmed := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - confirmed
server.NotifyWalletBalance(a.name, confirmed)
server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed)
} }
} }
server.NotifyBalances()
return nil return nil
} }
@ -462,7 +448,7 @@ func (c *rpcClient) Handshake() error {
} }
if server != nil { if server != nil {
server.NotifyNewBlockChainHeight(&bs) server.NotifyNewBlockChainHeight(&bs)
server.NotifyBalances(nil) server.NotifyBalances()
} }
// Get default account. Only the default account is used to // Get default account. Only the default account is used to

View file

@ -1369,7 +1369,7 @@ func (s *rpcServer) NotifyNewBlockChainHeight(bs *wallet.BlockStamp) {
// TODO(jrick): Switch this to return a single JSON object // TODO(jrick): Switch this to return a single JSON object
// (map[string]interface{}) of all accounts and their balances, instead of // (map[string]interface{}) of all accounts and their balances, instead of
// separate notifications for each account. // separate notifications for each account.
func (s *rpcServer) NotifyBalances(wsc *websocketClient) { func (s *rpcServer) NotifyBalances() {
for _, a := range AcctMgr.AllAccounts() { for _, a := range AcctMgr.AllAccounts() {
balance := a.CalculateBalance(1) balance := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - balance unconfirmed := a.CalculateBalance(0) - balance
@ -2687,49 +2687,3 @@ func (s *rpcServer) NotifyNewTxDetails(account string, details btcjson.ListTrans
} }
s.broadcasts <- mntfn s.broadcasts <- mntfn
} }
// NotifiedRecvTxRequest is used to check whether the outpoint of
// a received transaction has already been notified due to
// arriving first in the btcd mempool.
type NotifiedRecvTxRequest struct {
op btcwire.OutPoint
response chan NotifiedRecvTxResponse
}
// NotifiedRecvTxResponse is the response of a NotifiedRecvTxRequest
// request.
type NotifiedRecvTxResponse bool
// NotifiedRecvTxChans holds the channels to manage
// StoreNotifiedMempoolTxs.
var NotifiedRecvTxChans = struct {
add, remove chan btcwire.OutPoint
access chan NotifiedRecvTxRequest
}{
add: make(chan btcwire.OutPoint),
remove: make(chan btcwire.OutPoint),
access: make(chan NotifiedRecvTxRequest),
}
// StoreNotifiedMempoolRecvTxs maintains a set of previously-sent
// received transaction notifications originating from the btcd
// mempool. This is used to prevent duplicate client transaction
// notifications once a mempool tx is mined into a block.
func StoreNotifiedMempoolRecvTxs(add, remove chan btcwire.OutPoint,
access chan NotifiedRecvTxRequest) {
m := make(map[btcwire.OutPoint]struct{})
for {
select {
case op := <-add:
m[op] = struct{}{}
case op := <-remove:
delete(m, op)
case req := <-access:
_, ok := m[req.op]
req.response <- NotifiedRecvTxResponse(ok)
}
}
}

View file

@ -447,12 +447,12 @@ func (t *txRecord) ReadFrom(r io.Reader) (int64, error) {
// For each expected output key, allocate and read the key, // For each expected output key, allocate and read the key,
// appending the result to the spends slice. This slice is // appending the result to the spends slice. This slice is
// originally set to nil (*not* preallocated to spendsCount // originally set empty (*not* preallocated to spendsCount
// size) to prevent accidentally allocating so much memory that // size) to prevent accidentally allocating so much memory that
// the process dies. // the process dies.
var spends []*BlockOutputKey var spends []BlockOutputKey
for i := uint32(0); i < spendsCount; i++ { for i := uint32(0); i < spendsCount; i++ {
k := &BlockOutputKey{} k := BlockOutputKey{}
tmpn64, err := k.ReadFrom(r) tmpn64, err := k.ReadFrom(r)
n64 += tmpn64 n64 += tmpn64
if err != nil { if err != nil {

View file

@ -244,7 +244,7 @@ type txRecord struct {
// transaction credits. // transaction credits.
type debits struct { type debits struct {
amount btcutil.Amount amount btcutil.Amount
spends []*BlockOutputKey spends []BlockOutputKey
} }
// credit describes a transaction output which was or is spendable by wallet. // credit describes a transaction output which was or is spendable by wallet.
@ -404,37 +404,6 @@ func (u *unconfirmedStore) txRecordForInserts(tx *btcutil.Tx) *txRecord {
return r return r
} }
func (r *txRecord) setDebitsSpends(spends []*BlockOutputKey, tx *btcutil.Tx) error {
if r.debits.spends != nil {
if *r.tx.Sha() == *tx.Sha() {
return ErrDuplicateInsert
}
return ErrInconsistentStore
}
r.debits.spends = spends
return nil
}
func (r *txRecord) setCredit(c *credit, index uint32, tx *btcutil.Tx) error {
if len(r.credits) <= int(index) {
r.credits = extendCredits(r.credits, index)
}
if r.credits[index] != nil {
if *r.tx.Sha() == *tx.Sha() {
return ErrDuplicateInsert
}
return ErrInconsistentStore
}
r.credits[index] = c
return nil
}
func extendCredits(c []*credit, index uint32) []*credit {
missing := make([]*credit, int(index+1)-len(c))
return append(c, missing...)
}
func (s *Store) moveMinedTx(r *txRecord, block *Block) error { func (s *Store) moveMinedTx(r *txRecord, block *Block) error {
delete(s.unconfirmed.txs, *r.Tx().Sha()) delete(s.unconfirmed.txs, *r.Tx().Sha())
@ -465,12 +434,9 @@ func (s *Store) moveMinedTx(r *txRecord, block *Block) error {
if err != nil { if err != nil {
return err return err
} }
if len(rr.credits) <= int(prev.OutputIndex) {
rr.credits = extendCredits(rr.credits, prev.OutputIndex)
}
rr.credits[prev.OutputIndex].spentBy = &key rr.credits[prev.OutputIndex].spentBy = &key
// debits should already be non-nil // debits should already be non-nil
r.debits.spends = append(r.debits.spends, &prev) r.debits.spends = append(r.debits.spends, prev)
} }
// For each credit in r, if the credit is spent by another unconfirmed // For each credit in r, if the credit is spent by another unconfirmed
@ -483,20 +449,21 @@ func (s *Store) moveMinedTx(r *txRecord, block *Block) error {
// If the credit is not spent, modify the store's unspent bookkeeping // If the credit is not spent, modify the store's unspent bookkeeping
// maps to include the credit and increment the amount deltas by the // maps to include the credit and increment the amount deltas by the
// credit's value. // credit's value.
op := btcwire.OutPoint{Hash: *r.Tx().Sha()}
for i, credit := range r.credits { for i, credit := range r.credits {
if credit == nil { if credit == nil {
continue continue
} }
op := btcwire.NewOutPoint(r.Tx().Sha(), uint32(i)) op.Index = uint32(i)
outputKey := BlockOutputKey{key, uint32(i)} outputKey := BlockOutputKey{key, op.Index}
if rr, ok := s.unconfirmed.spentUnconfirmed[*op]; ok { if rr, ok := s.unconfirmed.spentUnconfirmed[op]; ok {
delete(s.unconfirmed.spentUnconfirmed, *op) delete(s.unconfirmed.spentUnconfirmed, op)
s.unconfirmed.spentBlockOutPointKeys[*op] = outputKey s.unconfirmed.spentBlockOutPointKeys[op] = outputKey
s.unconfirmed.spentBlockOutPoints[outputKey] = rr s.unconfirmed.spentBlockOutPoints[outputKey] = rr
credit.spentBy = &BlockTxKey{BlockHeight: -1} credit.spentBy = &BlockTxKey{BlockHeight: -1}
} else if credit.spentBy == nil { } else if credit.spentBy == nil {
// Mark outpoint unspent. // Mark outpoint unspent.
s.unspent[*op] = key s.unspent[op] = key
// Increment spendable amount delta as a result of // Increment spendable amount delta as a result of
// moving this credit to this block. // moving this credit to this block.
@ -541,10 +508,9 @@ func (s *Store) InsertTx(tx *btcutil.Tx, block *Block) (*TxRecord, error) {
// Simply create or return the transaction record if this transaction // Simply create or return the transaction record if this transaction
// is unconfirmed. // is unconfirmed.
if block == nil { if block == nil {
key := BlockTxKey{BlockHeight: -1}
r := s.unconfirmed.txRecordForInserts(tx) r := s.unconfirmed.txRecordForInserts(tx)
r.received = received r.received = received
return &TxRecord{key, r, s}, nil return &TxRecord{BlockTxKey{BlockHeight: -1}, r, s}, nil
} }
// Check if block records already exist for this tx. If so, // Check if block records already exist for this tx. If so,
@ -641,24 +607,18 @@ func (t *TxRecord) AddDebits(spent []Credit) (Debits, error) {
t.s.unconfirmed.spentUnconfirmed[*op] = t.txRecord t.s.unconfirmed.spentUnconfirmed[*op] = t.txRecord
default: default:
key := c.outputKey() key := c.outputKey()
t.s.unconfirmed.spentBlockOutPointKeys[*op] = *key t.s.unconfirmed.spentBlockOutPointKeys[*op] = key
t.s.unconfirmed.spentBlockOutPoints[*key] = t.txRecord t.s.unconfirmed.spentBlockOutPoints[key] = t.txRecord
} }
} }
default: default:
if t.debits.spends == nil { if t.debits.spends == nil {
prevOutputKeys := make([]*BlockOutputKey, 0, len(spent)) prevOutputKeys := make([]BlockOutputKey, len(spent))
for _, c := range spent { for i, c := range spent {
prevOutputKeys = append(prevOutputKeys, c.outputKey()) prevOutputKeys[i] = c.outputKey()
}
err := t.txRecord.setDebitsSpends(prevOutputKeys, t.tx)
if err != nil {
if err == ErrDuplicateInsert {
return Debits{t}, nil
}
return Debits{}, err
} }
t.txRecord.debits.spends = prevOutputKeys
} }
} }
return Debits{t}, nil return Debits{t}, nil
@ -728,10 +688,9 @@ func (s *Store) markOutputsSpent(spent []Credit, t *TxRecord) (btcutil.Amount, e
credit.spentBy = &t.BlockTxKey credit.spentBy = &t.BlockTxKey
delete(s.unspent, *op) delete(s.unspent, *op)
if t.BlockHeight == -1 { // unconfirmed if t.BlockHeight == -1 { // unconfirmed
op := prev.OutPoint()
key := prev.outputKey() key := prev.outputKey()
s.unconfirmed.spentBlockOutPointKeys[*op] = *key s.unconfirmed.spentBlockOutPointKeys[*op] = key
s.unconfirmed.spentBlockOutPoints[*key] = t.txRecord s.unconfirmed.spentBlockOutPoints[key] = t.txRecord
} }
// Increment total debited amount. // Increment total debited amount.
@ -752,6 +711,23 @@ func (s *Store) markOutputsSpent(spent []Credit, t *TxRecord) (btcutil.Amount, e
return a, nil return a, nil
} }
func (r *txRecord) setCredit(index uint32, change bool, tx *btcutil.Tx) error {
if r.credits == nil {
r.credits = make([]*credit, 0, len(tx.MsgTx().TxOut))
}
for i := uint32(len(r.credits)); i <= index; i++ {
r.credits = append(r.credits, nil)
}
if r.credits[index] != nil {
if *r.tx.Sha() == *tx.Sha() {
return ErrDuplicateInsert
}
return ErrInconsistentStore
}
r.credits[index] = &credit{change: change}
return nil
}
// AddCredit marks the transaction record as containing a transaction output // AddCredit marks the transaction record as containing a transaction output
// spendable by wallet. The output is added unspent, and is marked spent // spendable by wallet. The output is added unspent, and is marked spent
// when a new transaction spending the output is inserted into the store. // when a new transaction spending the output is inserted into the store.
@ -760,8 +736,7 @@ func (t *TxRecord) AddCredit(index uint32, change bool) (Credit, error) {
return Credit{}, errors.New("transaction output does not exist") return Credit{}, errors.New("transaction output does not exist")
} }
c := &credit{change: change} if err := t.txRecord.setCredit(index, change, t.tx); err != nil {
if err := t.txRecord.setCredit(c, index, t.tx); err != nil {
if err == ErrDuplicateInsert { if err == ErrDuplicateInsert {
return Credit{t, index}, nil return Credit{t, index}, nil
} }
@ -873,7 +848,7 @@ func (s *Store) Rollback(height int32) error {
current := BlockOutputKey{ current := BlockOutputKey{
BlockTxKey: BlockTxKey{BlockHeight: -1}, BlockTxKey: BlockTxKey{BlockHeight: -1},
} }
err = spender.swapDebits(&prev, &current) err = spender.swapDebits(prev, current)
if err != nil { if err != nil {
return err return err
} }
@ -890,7 +865,7 @@ func (s *Store) Rollback(height int32) error {
if err != nil { if err != nil {
return err return err
} }
c, err := rr.lookupBlockCredit(*prev) c, err := rr.lookupBlockCredit(prev)
if err != nil { if err != nil {
return err return err
} }
@ -898,8 +873,8 @@ func (s *Store) Rollback(height int32) error {
Hash: *rr.Tx().Sha(), Hash: *rr.Tx().Sha(),
Index: prev.OutputIndex, Index: prev.OutputIndex,
} }
s.unconfirmed.spentBlockOutPointKeys[op] = *prev s.unconfirmed.spentBlockOutPointKeys[op] = prev
s.unconfirmed.spentBlockOutPoints[*prev] = r s.unconfirmed.spentBlockOutPoints[prev] = r
c.spentBy = &BlockTxKey{BlockHeight: -1} c.spentBy = &BlockTxKey{BlockHeight: -1}
} }
@ -913,18 +888,15 @@ func (s *Store) Rollback(height int32) error {
return nil return nil
} }
func (r *txRecord) swapDebits(previous, current *BlockOutputKey) error { func (r *txRecord) swapDebits(previous, current BlockOutputKey) error {
for i, outputKey := range r.debits.spends { for i, outputKey := range r.debits.spends {
if outputKey == nil { if outputKey == previous {
continue
}
if *outputKey == *previous {
r.debits.spends[i] = current r.debits.spends[i] = current
return nil return nil
} }
} }
return MissingCreditError(*previous) return MissingCreditError(previous)
} }
// UnminedDebitTxs returns the underlying transactions for all wallet // UnminedDebitTxs returns the underlying transactions for all wallet
@ -1034,7 +1006,7 @@ func (s *Store) UnspentOutputs() ([]Credit, error) {
i := 0 i := 0
for op, key := range s.unspent { for op, key := range s.unspent {
creditChans[i] = make(chan createdCredit) creditChans[i] = make(chan createdCredit)
go func(i int, opIndex uint32) { go func(i int, key BlockTxKey, opIndex uint32) {
r, err := s.lookupBlockTx(key) r, err := s.lookupBlockTx(key)
if err != nil { if err != nil {
creditChans[i] <- createdCredit{err: err} creditChans[i] <- createdCredit{err: err}
@ -1043,7 +1015,7 @@ func (s *Store) UnspentOutputs() ([]Credit, error) {
t := &TxRecord{key, r, s} t := &TxRecord{key, r, s}
c := Credit{t, opIndex} c := Credit{t, opIndex}
creditChans[i] <- createdCredit{credit: c} creditChans[i] <- createdCredit{credit: c}
}(i, op.Index) }(i, key, op.Index)
i++ i++
} }
@ -1276,6 +1248,15 @@ func (t *TxRecord) Credits() []Credit {
return credits return credits
} }
// HasCredit returns whether the transaction output at the passed index is
// a wallet credit.
func (t *TxRecord) HasCredit(i int) bool {
if len(t.credits) <= i {
return false
}
return t.credits[i] != nil
}
// InputAmount returns the total amount debited from previous credits. // InputAmount returns the total amount debited from previous credits.
func (d Debits) InputAmount() btcutil.Amount { func (d Debits) InputAmount() btcutil.Amount {
return d.txRecord.debits.amount return d.txRecord.debits.amount
@ -1347,8 +1328,8 @@ func (c Credit) OutPoint() *btcwire.OutPoint {
} }
// outputKey creates and returns the block lookup key for this credit. // outputKey creates and returns the block lookup key for this credit.
func (c Credit) outputKey() *BlockOutputKey { func (c Credit) outputKey() BlockOutputKey {
return &BlockOutputKey{ return BlockOutputKey{
BlockTxKey: c.BlockTxKey, BlockTxKey: c.BlockTxKey,
OutputIndex: c.OutputIndex, OutputIndex: c.OutputIndex,
} }