From 019df772b1195bb186b239575cf61b42abb63001 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Thu, 5 Sep 2013 11:19:48 -0400 Subject: [PATCH] Use a single handler (per wallet) for all tx notifications. --- cmd.go | 242 +++++++++++++++++++++++++++++------------------------ sockets.go | 2 +- 2 files changed, 132 insertions(+), 112 deletions(-) diff --git a/cmd.go b/cmd.go index 7679192..c023c49 100644 --- a/cmd.go +++ b/cmd.go @@ -79,9 +79,10 @@ func main() { type BtcWallet struct { *wallet.Wallet - mtx sync.RWMutex - dirty bool - UtxoStore struct { + mtx sync.RWMutex + dirty bool + NewBlockTxSeqN uint64 + UtxoStore struct { sync.RWMutex dirty bool s tx.UtxoStore @@ -183,13 +184,35 @@ func OpenWallet(cfg *config, account string) (*BtcWallet, error) { } func (w *BtcWallet) Track() { - wallets.Lock() - name := w.Name() - if wallets.m[name] == nil { - wallets.m[name] = w - } - wallets.Unlock() + seq.Lock() + n := seq.n + seq.n++ + seq.Unlock() + // Use goroutines and a WaitGroup to prevent unnecessary waiting for + // released locks. + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + wallets.Lock() + name := w.Name() + if wallets.m[name] == nil { + wallets.m[name] = w + } + wallets.Unlock() + }() + go func() { + defer wg.Done() + w.mtx.Lock() + w.NewBlockTxSeqN = n + w.mtx.Unlock() + }() + wg.Wait() + + replyHandlers.Lock() + replyHandlers.m[n] = w.NewBlockTxHandler + replyHandlers.Unlock() for _, addr := range w.GetActiveAddresses() { go w.ReqNewTxsForAddress(addr) } @@ -231,10 +254,9 @@ func (w *BtcWallet) RescanForAddress(addr string, blocks ...int) { } func (w *BtcWallet) ReqNewTxsForAddress(addr string) { - seq.Lock() - n := seq.n - seq.n++ - seq.Unlock() + w.mtx.RLock() + n := w.NewBlockTxSeqN + w.mtx.RUnlock() m := &btcjson.Message{ Jsonrpc: "1.0", @@ -244,103 +266,101 @@ func (w *BtcWallet) ReqNewTxsForAddress(addr string) { } msg, _ := json.Marshal(m) - replyHandlers.Lock() - replyHandlers.m[n] = func(result interface{}) bool { - // TODO(jrick): btcd also sends the block hash in the reply. - // Do we want it saved as well? - v, ok := result.(map[string]interface{}) - if !ok { - log.Error("Tx Handler: Unexpected result type.") - return false - } - sender58, ok := v["sender"].(string) - if !ok { - log.Error("Tx Handler: Unspecified sender.") - return false - } - receiver58, ok := v["receiver"].(string) - if !ok { - log.Error("Tx Handler: Unspecified receiver.") - return false - } - height, ok := v["height"].(float64) - if !ok { - log.Error("Tx Handler: Unspecified height.") - return false - } - txhashBE, ok := v["txhash"].(string) - if !ok { - log.Error("Tx Handler: Unspecified transaction hash.") - return false - } - index, ok := v["index"].(float64) - if !ok { - log.Error("Tx Handler: Unspecified transaction index.") - return false - } - amt, ok := v["amount"].(float64) - if !ok { - log.Error("Tx Handler: Unspecified amount.") - return false - } - spent, ok := v["spent"].(bool) - if !ok { - log.Error("Tx Handler: Unspecified spent field.") - return false - } - - // btcd sends the tx hashe as a BE string. Convert to a - // LE ShaHash. - txhash, err := btcwire.NewShaHashFromStr(txhashBE) - if err != nil { - log.Error("Tx Handler: Tx hash string cannot be parsed: " + err.Error()) - return false - } - - sender := btcutil.Base58Decode(sender58) - receiver := btcutil.Base58Decode(receiver58) - - go func() { - t := &tx.RecvTx{ - Amt: int64(amt), - } - copy(t.TxHash[:], txhash[:]) - copy(t.SenderAddr[:], sender) - copy(t.ReceiverAddr[:], receiver) - - w.TxStore.Lock() - txs := w.TxStore.s - w.TxStore.s = append(txs, t) - w.TxStore.dirty = true - w.TxStore.Unlock() - }() - - go func() { - // Do not add output to utxo store if spent. - if spent { - return - } - - u := &tx.Utxo{ - Amt: int64(amt), - Height: int64(height), - } - copy(u.Out.Hash[:], txhash[:]) - u.Out.Index = uint32(index) - copy(u.Addr[:], receiver) - - w.UtxoStore.Lock() - // All newly saved utxos are first classified as unconfirmed. - utxos := w.UtxoStore.s.Unconfirmed - w.UtxoStore.s.Unconfirmed = append(utxos, u) - w.UtxoStore.dirty = true - w.UtxoStore.Unlock() - }() - - // Never remove this handler. - return false - } - replyHandlers.Unlock() - btcdMsgs <- msg } + +func (w *BtcWallet) NewBlockTxHandler(result interface{}) bool { + // TODO(jrick): btcd also sends the block hash in the reply. + // Do we want it saved as well? + v, ok := result.(map[string]interface{}) + if !ok { + log.Error("Tx Handler: Unexpected result type.") + return false + } + sender58, ok := v["sender"].(string) + if !ok { + log.Error("Tx Handler: Unspecified sender.") + return false + } + receiver58, ok := v["receiver"].(string) + if !ok { + log.Error("Tx Handler: Unspecified receiver.") + return false + } + height, ok := v["height"].(float64) + if !ok { + log.Error("Tx Handler: Unspecified height.") + return false + } + txhashBE, ok := v["txhash"].(string) + if !ok { + log.Error("Tx Handler: Unspecified transaction hash.") + return false + } + index, ok := v["index"].(float64) + if !ok { + log.Error("Tx Handler: Unspecified transaction index.") + return false + } + amt, ok := v["amount"].(float64) + if !ok { + log.Error("Tx Handler: Unspecified amount.") + return false + } + spent, ok := v["spent"].(bool) + if !ok { + log.Error("Tx Handler: Unspecified spent field.") + return false + } + + // btcd sends the tx hash as a BE string. Convert to a + // LE ShaHash. + txhash, err := btcwire.NewShaHashFromStr(txhashBE) + if err != nil { + log.Error("Tx Handler: Tx hash string cannot be parsed: " + err.Error()) + return false + } + + sender := btcutil.Base58Decode(sender58) + receiver := btcutil.Base58Decode(receiver58) + + go func() { + t := &tx.RecvTx{ + Amt: int64(amt), + } + copy(t.TxHash[:], txhash[:]) + copy(t.SenderAddr[:], sender) + copy(t.ReceiverAddr[:], receiver) + + w.TxStore.Lock() + txs := w.TxStore.s + w.TxStore.s = append(txs, t) + w.TxStore.dirty = true + w.TxStore.Unlock() + }() + + go func() { + // Do not add output to utxo store if spent. + if spent { + return + } + + u := &tx.Utxo{ + Amt: int64(amt), + Height: int64(height), + } + copy(u.Out.Hash[:], txhash[:]) + u.Out.Index = uint32(index) + copy(u.Addr[:], receiver) + + w.UtxoStore.Lock() + // All newly saved utxos are first classified as unconfirmed. + utxos := w.UtxoStore.s.Unconfirmed + w.UtxoStore.s.Unconfirmed = append(utxos, u) + w.UtxoStore.dirty = true + w.UtxoStore.Unlock() + }() + + // Never remove this handler. + return false +} diff --git a/sockets.go b/sockets.go index 73576df..e10e6c1 100644 --- a/sockets.go +++ b/sockets.go @@ -319,7 +319,7 @@ func ListenAndServe() error { // requests for each channel in the set. go frontendListenerDuplicator() - // XXX(jrick): We need some sort of authentication before websocket + // TODO(jrick): We need some sort of authentication before websocket // connections are allowed, and perhaps TLS on the server as well. http.Handle("/frontend", websocket.Handler(frontendReqsNotifications)) if err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.SvrPort), nil); err != nil {