From d8c5222474306daae5abd97951c8453f00bad761 Mon Sep 17 00:00:00 2001 From: "Owain G. Ainsworth" Date: Mon, 4 Nov 2013 18:31:56 +0000 Subject: [PATCH] Rework the way we send notifications over the websocket Redo the datastructures we search so that we only do one lookup per txin and txout instead of doing a loop per wallet connection. Don't send spent data on tx notifications, this can be worked out in wallet and it is expensiveish to calculate. However we DO check upon getting a notification request if the output is already spent, and in which case we send an immediate notification to force a rescan. MinedTxNotfications are handled separately to the connected block messages largely to enable this to scale rather better. Tested by jrick (who found one bug i had introduced, thanks!) Additionally (accidentally squashed in): Add handlers for all known commands. We have handlers for all wallet-requiring commands that will return a suitable error. Unimplemented commands temporarily return an error stating so. --- rpcserver.go | 529 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 340 insertions(+), 189 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index f4dd1fae..ca905c88 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7,6 +7,7 @@ package main import ( "bytes" "code.google.com/p/go.net/websocket" + "container/list" "encoding/base64" "encoding/hex" "encoding/json" @@ -53,104 +54,171 @@ type rpcServer struct { // wsContext holds the items the RPC server needs to handle websocket // connections for wallets. type wsContext struct { - // walletListeners holds a map of each currently connected wallet - // listener as the key. The value is ignored, as this is only used as - // a set. A mutex is used to prevent incorrect multiple access. - walletListeners struct { - sync.RWMutex - m map[chan []byte]bool - } + sync.RWMutex - // requests holds all wallet notification requests. - requests wsRequests + // connections holds a map of each currently connected wallet + // listener as the key. + connections map[chan []byte]*requestContexts // Any chain notifications meant to be received by every connected // wallet are sent across this channel. walletNotificationMaster chan []byte + + // Map of address hash to list of notificationCtx. This is the global + // list we actually use for notifications, we also keep a list in the + // requestContexts to make removal from this list on connection close + // less horrendously expensive. + txNotifications map[string]*list.List + + // Map of outpoint to list of notificationCtx. + spentNotifications map[btcwire.OutPoint]*list.List + + // Map of shas to list of notificationCtx. + minedTxNotifications map[btcwire.ShaHash]*list.List } -// wsRequests maps request contexts for wallet notifications to a -// wallet notification channel. A Mutex is used to protect incorrect -// concurrent access to the map. -type wsRequests struct { - sync.Mutex - m map[chan []byte]*requestContexts -} - -// getOrCreateContexts gets the request contexts, or creates and adds a -// new context if one for this wallet is not already present. -func (r *wsRequests) getOrCreateContexts(walletNotification chan []byte) *requestContexts { - rc, ok := r.m[walletNotification] - if !ok { - rc = &requestContexts{ - // The key is a stringified addressHash. - txRequests: make(map[string]interface{}), - - spentRequests: make(map[btcwire.OutPoint]interface{}), - minedTxRequests: make(map[btcwire.ShaHash]bool), - } - r.m[walletNotification] = rc - } - return rc +type notificationCtx struct { + id interface{} + connection chan []byte + rc *requestContexts } // AddTxRequest adds the request context for new transaction notifications. -func (r *wsRequests) AddTxRequest(walletNotification chan []byte, addrhash string, id interface{}) { +func (r *wsContext) AddTxRequest(walletNotification chan []byte, rc *requestContexts, addrhash string, id interface{}) { r.Lock() defer r.Unlock() - rc := r.getOrCreateContexts(walletNotification) + nc := ¬ificationCtx{ + id: id, + connection: walletNotification, + rc: rc, + } + + clist, ok := r.txNotifications[addrhash] + if !ok { + clist = list.New() + r.txNotifications[addrhash] = clist + } + + clist.PushBack(nc) + rc.txRequests[addrhash] = id } +func (r *wsContext) removeGlobalTxRequest(walletNotification chan []byte, addrhash string) { + clist := r.txNotifications[addrhash] + for e := clist.Front(); e != nil; e = e.Next() { + ctx := e.Value.(*notificationCtx) + if ctx.connection == walletNotification { + clist.Remove(e) + break + } + } + + if clist.Len() == 0 { + delete(r.txNotifications, addrhash) + } +} + // AddSpentRequest adds a request context for notifications of a spent // Outpoint. -func (r *wsRequests) AddSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint, id interface{}) { +func (r *wsContext) AddSpentRequest(walletNotification chan []byte, rc *requestContexts, op *btcwire.OutPoint, id interface{}) { r.Lock() defer r.Unlock() - rc := r.getOrCreateContexts(walletNotification) + nc := ¬ificationCtx{ + id: id, + connection: walletNotification, + rc: rc, + } + clist, ok := r.spentNotifications[*op] + if !ok { + clist = list.New() + r.spentNotifications[*op] = clist + } + clist.PushBack(nc) rc.spentRequests[*op] = id } +func (r *wsContext) removeGlobalSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) { + clist := r.spentNotifications[*op] + for e := clist.Front(); e != nil; e = e.Next() { + ctx := e.Value.(*notificationCtx) + if ctx.connection == walletNotification { + clist.Remove(e) + break + } + } + + if clist.Len() == 0 { + delete(r.spentNotifications, *op) + } +} + // RemoveSpentRequest removes a request context for notifications of a // spent Outpoint. -func (r *wsRequests) RemoveSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) { +func (r *wsContext) RemoveSpentRequest(walletNotification chan []byte, rc *requestContexts, op *btcwire.OutPoint) { r.Lock() defer r.Unlock() - rc := r.getOrCreateContexts(walletNotification) + r.removeGlobalSpentRequest(walletNotification, op) delete(rc.spentRequests, *op) } // AddMinedTxRequest adds request contexts for notifications of a // mined transaction. -func (r *wsRequests) AddMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { +func (r *wsContext) AddMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { r.Lock() defer r.Unlock() - rc := r.getOrCreateContexts(walletNotification) + rc := r.connections[walletNotification] + + nc := ¬ificationCtx{ + connection: walletNotification, + rc: rc, + } + clist, ok := r.minedTxNotifications[*txID] + if !ok { + clist = list.New() + r.minedTxNotifications[*txID] = clist + } + clist.PushBack(nc) rc.minedTxRequests[*txID] = true } +func (r *wsContext) removeGlobalMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { + clist := r.minedTxNotifications[*txID] + for e := clist.Front(); e != nil; e = e.Next() { + ctx := e.Value.(*notificationCtx) + if ctx.connection == walletNotification { + clist.Remove(e) + break + } + } + + if clist.Len() == 0 { + delete(r.minedTxNotifications, *txID) + } +} + // RemoveMinedTxRequest removes request contexts for notifications of a // mined transaction. -func (r *wsRequests) RemoveMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { +func (r *wsContext) RemoveMinedTxRequest(walletNotification chan []byte, rc *requestContexts, txID *btcwire.ShaHash) { r.Lock() defer r.Unlock() - rc := r.getOrCreateContexts(walletNotification) + r.removeGlobalMinedTxRequest(walletNotification, txID) delete(rc.minedTxRequests, *txID) } // CloseListeners removes all request contexts for notifications sent // to a wallet notification channel and closes the channel to stop all // goroutines currently serving that wallet. -func (r *wsRequests) CloseListeners(walletNotification chan []byte) { +func (r *wsContext) CloseListeners(walletNotification chan []byte) { r.Lock() defer r.Unlock() - delete(r.m, walletNotification) + delete(r.connections, walletNotification) close(walletNotification) } @@ -244,9 +312,11 @@ func newRPCServer(s *server) (*rpcServer, error) { rpc.password = cfg.RPCPass // initialize memory for websocket connections - rpc.ws.requests.m = make(map[chan []byte]*requestContexts) - rpc.ws.walletListeners.m = make(map[chan []byte]bool) + rpc.ws.connections = make(map[chan []byte]*requestContexts) rpc.ws.walletNotificationMaster = make(chan []byte) + rpc.ws.txNotifications = make(map[string]*list.List) + rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List) + rpc.ws.minedTxNotifications = make(map[btcwire.ShaHash]*list.List) // IPv4 listener. var listeners []net.Listener @@ -309,25 +379,80 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { type commandHandler func(*rpcServer, btcjson.Cmd, chan []byte) (interface{}, error) var handlers = map[string]commandHandler{ - "addnode": handleAddNode, - "decoderawtransaction": handleDecodeRawTransaction, - "getbestblockhash": handleGetBestBlockHash, - "getblock": handleGetBlock, - "getblockcount": handleGetBlockCount, - "getblockhash": handleGetBlockHash, - "getconnectioncount": handleGetConnectionCount, - "getdifficulty": handleGetDifficulty, - "getgenerate": handleGetGenerate, - "gethashespersec": handleGetHashesPerSec, - "getpeerinfo": handleGetPeerInfo, - "getrawmempool": handleGetRawMempool, - "getrawtransaction": handleGetRawTransaction, - "sendrawtransaction": handleSendRawTransaction, - "setgenerate": handleSetGenerate, - "stop": handleStop, + "addmultisigaddress": handleAskWallet, + "addnode": handleAddNode, + "backupwallet": handleAskWallet, + "createmultisig": handleAskWallet, + "createrawtransaction": handleUnimplemented, + "decoderawtransaction": handleDecodeRawTransaction, + "decodescript": handleUnimplemented, + "dumpprivkey": handleAskWallet, + "dumpwallet": handleAskWallet, + "encryptwallet": handleAskWallet, + "getaccount": handleAskWallet, + "getaccountaddress": handleAskWallet, + "getaddednodeinfo": handleUnimplemented, + "getaddressesbyaccount": handleAskWallet, + "getbalance": handleAskWallet, + "getbestblockhash": handleGetBestBlockHash, + "getblock": handleGetBlock, + "getblockcount": handleGetBlockCount, + "getblockhash": handleGetBlockHash, + "getblocktemplate": handleUnimplemented, + "getconnectioncount": handleGetConnectionCount, + "getdifficulty": handleGetDifficulty, + "getgenerate": handleGetGenerate, + "gethashespersec": handleGetHashesPerSec, + "getinfo": handleUnimplemented, + "getmininginfo": handleUnimplemented, + "getnettotals": handleUnimplemented, + "getnetworkhashps": handleUnimplemented, + "getnewaddress": handleUnimplemented, + "getpeerinfo": handleGetPeerInfo, + "getrawchangeaddress": handleAskWallet, + "getrawmempool": handleGetRawMempool, + "getrawtransaction": handleGetRawTransaction, + "getreceivedbyaccount": handleAskWallet, + "getreceivedbyaddress": handleAskWallet, + "gettransaction": handleAskWallet, + "gettxout": handleAskWallet, + "gettxoutsetinfo": handleAskWallet, + "getwork": handleUnimplemented, + "help": handleUnimplemented, + "importprivkey": handleAskWallet, + "importwallet": handleAskWallet, + "keypoolrefill": handleAskWallet, + "listaccounts": handleAskWallet, + "listaddressgroupings": handleAskWallet, + "listlockunspent": handleAskWallet, + "listreceivedbyaccount": handleAskWallet, + "listreceivedbyaddress": handleAskWallet, + "listsinceblock": handleAskWallet, + "listtransactions": handleAskWallet, + "listunspent": handleAskWallet, + "lockunspent": handleAskWallet, + "move": handleAskWallet, + "ping": handleUnimplemented, + "sendfrom": handleAskWallet, + "sendmany": handleAskWallet, + "sendrawtransaction": handleSendRawTransaction, + "sendtoaddress": handleAskWallet, + "setaccount": handleAskWallet, + "setgenerate": handleSetGenerate, + "settxfee": handleAskWallet, + "signmessage": handleAskWallet, + "signrawtransaction": handleAskWallet, + "stop": handleStop, + "submitblock": handleUnimplemented, + "validateaddress": handleAskWallet, + "verifychain": handleUnimplemented, + "verifymessage": handleAskWallet, + "walletlock": handleAskWallet, + "walletpassphrase": handleAskWallet, + "walletpassphrasechange": handleAskWallet, } -type wsCommandHandler func(*rpcServer, btcjson.Cmd, chan []byte) error +type wsCommandHandler func(*rpcServer, btcjson.Cmd, chan []byte, *requestContexts) error var wsHandlers = map[string]wsCommandHandler{ "getcurrentnet": handleGetCurrentNet, @@ -337,6 +462,21 @@ var wsHandlers = map[string]wsCommandHandler{ "notifyspent": handleNotifySpent, } +// handleUnimplemented is a temporary handler for commands that we should +// support but do not. +func handleUnimplemented(s *rpcServer, cmd btcjson.Cmd, + walletNotification chan []byte) (interface{}, error) { + return nil, btcjson.ErrUnimplemented +} + +// handleAskWallet is the handler for commands that we do recognise as valid +// but that we can not answer correctly since it involves wallet state. +// These commands will be implemented in btcwallet. +func handleAskWallet(s *rpcServer, cmd btcjson.Cmd, + walletNotification chan []byte) (interface{}, error) { + return nil, btcjson.ErrNoWallet +} + // handleDecodeRawTransaction handles decoderawtransaction commands. func handleAddNode(s *rpcServer, cmd btcjson.Cmd, walletNotification chan []byte) (interface{}, error) { @@ -640,7 +780,7 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, walletNotification // If called from websocket code, add a mined tx hashes // request. if walletNotification != nil { - s.ws.requests.AddMinedTxRequest(walletNotification, tx.Sha()) + s.ws.AddMinedTxRequest(walletNotification, tx.Sha()) } return tx.Sha().String(), nil @@ -738,7 +878,7 @@ func jsonRead(body []byte, s *rpcServer, walletNotification chan []byte) (reply // handleGetCurrentNet implements the getcurrentnet command extension // for websocket connections. func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, - walletNotification chan []byte) error { + walletNotification chan []byte, rc *requestContexts) error { id := cmd.Id() reply := &btcjson.Reply{Id: &id} @@ -759,7 +899,7 @@ func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, // handleGetBestBlock implements the getbestblock command extension // for websocket connections. func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, - walletNotification chan []byte) error { + walletNotification chan []byte, rc *requestContexts) error { id := cmd.Id() reply := &btcjson.Reply{Id: &id} @@ -784,7 +924,7 @@ func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, // handleRescan implements the rescan command extension for websocket // connections. func handleRescan(s *rpcServer, cmd btcjson.Cmd, - walletNotification chan []byte) error { + walletNotification chan []byte, rc *requestContexts) error { id := cmd.Id() reply := &btcjson.Reply{Id: &id} @@ -881,7 +1021,7 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd, // handleNotifyNewTXs implements the notifynewtxs command extension for // websocket connections. func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd, - walletNotification chan []byte) error { + walletNotification chan []byte, rc *requestContexts) error { id := cmd.Id() reply := &btcjson.Reply{Id: &id} @@ -896,8 +1036,8 @@ func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd, if err != nil { return fmt.Errorf("cannot decode address: %v", err) } - s.ws.requests.AddTxRequest(walletNotification, - string(hash[:]), id) + s.ws.AddTxRequest(walletNotification, rc, string(hash), + cmd.Id()) } mreply, _ := json.Marshal(reply) @@ -908,7 +1048,7 @@ func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd, // handleNotifySpent implements the notifyspent command extension for // websocket connections. func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd, - walletNotification chan []byte) error { + walletNotification chan []byte, rc *requestContexts) error { id := cmd.Id() reply := &btcjson.Reply{Id: &id} @@ -918,14 +1058,17 @@ func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd, return btcjson.ErrInternal } - s.ws.requests.AddSpentRequest(walletNotification, notifyCmd.OutPoint, id) + s.ws.AddSpentRequest(walletNotification, rc, notifyCmd.OutPoint, + cmd.Id()) mreply, _ := json.Marshal(reply) walletNotification <- mreply return nil } -func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte) error { +func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte, + rc *requestContexts) error { + var reply btcjson.Reply cmd, err := btcjson.ParseMarshaledCmd(body) @@ -961,7 +1104,7 @@ func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte) error return btcjson.ErrMethodNotFound } - if err := wsHandler(s, cmd, walletNotification); err != nil { + if err := wsHandler(s, cmd, walletNotification, rc); err != nil { jsonErr, ok := err.(btcjson.Error) if ok { reply.Error = &jsonErr @@ -1007,17 +1150,37 @@ func getDifficultyRatio(bits uint32) float64 { // AddWalletListener adds a channel to listen for new messages from a // wallet. -func (s *rpcServer) AddWalletListener(c chan []byte) { - s.ws.walletListeners.Lock() - s.ws.walletListeners.m[c] = true - s.ws.walletListeners.Unlock() +func (s *rpcServer) AddWalletListener(c chan []byte) *requestContexts { + s.ws.Lock() + rc := &requestContexts{ + // The key is a stringified addressHash. + txRequests: make(map[string]interface{}), + + spentRequests: make(map[btcwire.OutPoint]interface{}), + minedTxRequests: make(map[btcwire.ShaHash]bool), + } + s.ws.connections[c] = rc + s.ws.Unlock() + + return rc } // RemoveWalletListener removes a wallet listener channel. -func (s *rpcServer) RemoveWalletListener(c chan []byte) { - s.ws.walletListeners.Lock() - delete(s.ws.walletListeners.m, c) - s.ws.walletListeners.Unlock() +func (s *rpcServer) RemoveWalletListener(c chan []byte, rc *requestContexts) { + s.ws.Lock() + + for k := range rc.txRequests { + s.ws.removeGlobalTxRequest(c, k) + } + for k := range rc.spentRequests { + s.ws.removeGlobalSpentRequest(c, &k) + } + for k := range rc.minedTxRequests { + s.ws.removeGlobalMinedTxRequest(c, &k) + } + + delete(s.ws.connections, c) + s.ws.Unlock() } // walletListenerDuplicator listens for new wallet listener channels @@ -1029,11 +1192,11 @@ func (s *rpcServer) walletListenerDuplicator() { for { select { case ntfn := <-s.ws.walletNotificationMaster: - s.ws.walletListeners.RLock() - for c := range s.ws.walletListeners.m { + s.ws.RLock() + for c := range s.ws.connections { c <- ntfn } - s.ws.walletListeners.RUnlock() + s.ws.RUnlock() case <-s.quit: return @@ -1048,8 +1211,8 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) { // Add wallet notification channel so this handler receives btcd chain // notifications. c := make(chan []byte) - s.AddWalletListener(c) - defer s.RemoveWalletListener(c) + rc := s.AddWalletListener(c) + defer s.RemoveWalletListener(c, rc) // msgs is a channel for all messages received over the websocket. msgs := make(chan []byte) @@ -1081,7 +1244,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) { return } // Handle request here. - go s.websocketJSONHandler(c, m) + go s.websocketJSONHandler(c, rc, m) case ntfn, _ := <-c: // Send btcd notification to btcwallet instance over // websocket. @@ -1098,7 +1261,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) { // websocketJSONHandler parses and handles a marshalled json message, // sending the marshalled reply to a wallet notification channel. -func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) { +func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, rc *requestContexts, msg []byte) { s.wg.Add(1) reply, err := jsonRead(msg, s, walletNotification) s.wg.Done() @@ -1114,7 +1277,7 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b // Try websocket extensions s.wg.Add(1) - err = jsonWSRead(msg, s, walletNotification) + err = jsonWSRead(msg, s, walletNotification, rc) s.wg.Done() } @@ -1122,57 +1285,51 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b // of a new block connected to the main chain. The notification is sent // to each connected wallet. func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) { - s.ws.walletListeners.RLock() - for wltNtfn := range s.ws.walletListeners.m { - // Create notification with basic information filled in. - // This data is the same for every connected wallet. - hash, err := block.Sha() - if err != nil { - log.Error("Bad block; connected block notification dropped.") - return - } - ntfnResult := struct { + s.ws.RLock() + hash, err := block.Sha() + if err != nil { + log.Error("Bad block; connected block notification dropped.") + return + } + + var id interface{} = "btcd:blockconnected" + ntfn := btcjson.Reply{ + Result: struct { Hash string `json:"hash"` Height int64 `json:"height"` MinedTXs []string `json:"minedtxs"` }{ Hash: hash.String(), Height: block.Height(), - } + }, + Id: &id, + } + m, _ := json.Marshal(ntfn) + s.ws.walletNotificationMaster <- m - // Fill in additional wallet-specific notifications. If there - // is no request context for this wallet, no need to give this - // wallet any extra notifications. - if cxt := s.ws.requests.m[wltNtfn]; cxt != nil { - // Create list of all txs created by this wallet that appear in this - // block. - minedTxShas := make([]string, 0, len(cxt.minedTxRequests)) + // Inform any interested parties about txs mined in this block. + for _, tx := range block.Transactions() { + if clist, ok := s.ws.minedTxNotifications[*tx.Sha()]; ok { + for e := clist.Front(); e != nil; e = e.Next() { + ctx := e.Value.(*notificationCtx) - // TxShas does not return a non-nil error. - txShaList, _ := block.TxShas() - txList := s.server.db.FetchTxByShaList(txShaList) - for _, txReply := range txList { - if txReply.Err != nil { + var id interface{} = "btcd:txmined" + reply := btcjson.Reply{ + Result: tx.Sha().String(), + Id: &id, + } + replyBytes, err := json.Marshal(reply) + if err != nil { + log.Errorf("RPCS: Unable to marshal mined tx notification: %v", err) continue } - if _, ok := cxt.minedTxRequests[*txReply.Sha]; ok { - minedTxShas = append(minedTxShas, txReply.Sha.String()) - s.ws.requests.RemoveMinedTxRequest(wltNtfn, txReply.Sha) - } + ctx.connection <- replyBytes + s.ws.RemoveMinedTxRequest(ctx.connection, ctx.rc, + tx.Sha()) } - - ntfnResult.MinedTXs = minedTxShas } - - var id interface{} = "btcd:blockconnected" - ntfn := btcjson.Reply{ - Result: ntfnResult, - Id: &id, - } - m, _ := json.Marshal(ntfn) - wltNtfn <- m } - s.ws.walletListeners.RUnlock() + s.ws.RUnlock() } // NotifyBlockDisconnected creates and marshals a JSON message to notify @@ -1203,66 +1360,59 @@ func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) { // of new transactions (with both spent and unspent outputs) for a watched // address. func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) { - // Build a map of in-flight transactions to see if any of the inputs in - // this block are referencing other transactions earlier in this block. - txInFlight := map[btcwire.ShaHash]int{} - transactions := block.Transactions() - spent := make([][]bool, len(transactions)) - for i, tx := range transactions { - spent[i] = make([]bool, len(tx.MsgTx().TxOut)) - txInFlight[*tx.Sha()] = i + for _, tx := range block.Transactions() { + s.newBlockNotifyCheckTxIn(tx) + s.newBlockNotifyCheckTxOut(block, tx) } +} - // The newBlockNotifyCheckTxOut current needs spent data. This can - // this can ultimately be optimized out by making sure the notifications - // are in order. For now, just create the spent data. - for i, tx := range transactions[1:] { - for _, txIn := range tx.MsgTx().TxIn { - originHash := &txIn.PreviousOutpoint.Hash - if inFlightIndex, ok := txInFlight[*originHash]; ok && - i >= inFlightIndex { - - prevIndex := txIn.PreviousOutpoint.Index - spent[inFlightIndex][prevIndex] = true - } +func notifySpentData(ctx *notificationCtx, txhash *btcwire.ShaHash, index uint32, + spender *btcutil.Tx) { + txStr := "" + if spender != nil { + var buf bytes.Buffer + err := spender.MsgTx().Serialize(&buf) + if err != nil { + // This really shouldn't ever happen... + log.Warnf("RPCS: Can't serialize tx: %v", err) + return } + txStr = string(buf.Bytes()) } - for i, tx := range transactions { - go s.newBlockNotifyCheckTxIn(tx) - go s.newBlockNotifyCheckTxOut(block, tx, spent[i]) + reply := &btcjson.Reply{ + Result: struct { + TxHash string `json:"txhash"` + Index uint32 `json:"index"` + SpendingTx string `json:"spender,omitempty"` + }{ + TxHash: txhash.String(), + Index: index, + SpendingTx: txStr, + }, + Error: nil, + Id: &ctx.id, } + replyBytes, err := json.Marshal(reply) + if err != nil { + log.Errorf("RPCS: Unable to marshal spent notification: %v", err) + return + } + ctx.connection <- replyBytes } // newBlockNotifyCheckTxIn is a helper function to iterate through // each transaction input of a new block and perform any checks and // notify listening frontends when necessary. func (s *rpcServer) newBlockNotifyCheckTxIn(tx *btcutil.Tx) { - for wltNtfn, cxt := range s.ws.requests.m { - for _, txin := range tx.MsgTx().TxIn { - for op, id := range cxt.spentRequests { - if txin.PreviousOutpoint != op { - continue - } - - reply := &btcjson.Reply{ - Result: struct { - TxHash string `json:"txhash"` - Index uint32 `json:"index"` - }{ - TxHash: op.Hash.String(), - Index: uint32(op.Index), - }, - Error: nil, - Id: &id, - } - replyBytes, err := json.Marshal(reply) - if err != nil { - log.Errorf("RPCS: Unable to marshal spent notification: %v", err) - continue - } - wltNtfn <- replyBytes - s.ws.requests.RemoveSpentRequest(wltNtfn, &op) + for _, txin := range tx.MsgTx().TxIn { + if clist, ok := s.ws.spentNotifications[txin.PreviousOutpoint]; ok { + for e := clist.Front(); e != nil; e = e.Next() { + ctx := e.Value.(*notificationCtx) + notifySpentData(ctx, &txin.PreviousOutpoint.Hash, + uint32(txin.PreviousOutpoint.Index), tx) + s.ws.RemoveSpentRequest(ctx.connection, ctx.rc, + &txin.PreviousOutpoint) } } } @@ -1271,15 +1421,18 @@ func (s *rpcServer) newBlockNotifyCheckTxIn(tx *btcutil.Tx) { // newBlockNotifyCheckTxOut is a helper function to iterate through // each transaction output of a new block and perform any checks and // notify listening frontends when necessary. -func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.Tx, spent []bool) { - for wltNtfn, cxt := range s.ws.requests.m { - for i, txout := range tx.MsgTx().TxOut { - _, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript) - if err != nil { - log.Debug("Error getting payment address from tx; dropping any Tx notifications.") - break - } - if id, ok := cxt.txRequests[string(txaddrhash)]; ok { +func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.Tx) { + + for i, txout := range tx.MsgTx().TxOut { + _, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript) + if err != nil { + log.Debug("Error getting payment address from tx; dropping any Tx notifications.") + break + } + if idlist, ok := s.ws.txNotifications[string(txaddrhash)]; ok { + for e := idlist.Front(); e != nil; e = e.Next() { + ctx := e.Value.(*notificationCtx) + blkhash, err := block.Sha() if err != nil { log.Error("Error getting block sha; dropping Tx notification.") @@ -1300,7 +1453,6 @@ func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.T Index uint32 `json:"index"` Amount int64 `json:"amount"` PkScript string `json:"pkscript"` - Spent bool `json:"spent"` }{ Sender: "Unknown", // TODO(jrick) Receiver: txaddr, @@ -1310,17 +1462,16 @@ func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.T Index: uint32(i), Amount: txout.Value, PkScript: btcutil.Base58Encode(txout.PkScript), - Spent: spent[i], }, Error: nil, - Id: &id, + Id: &ctx.id, } replyBytes, err := json.Marshal(reply) if err != nil { log.Errorf("RPCS: Unable to marshal tx notification: %v", err) continue } - wltNtfn <- replyBytes + ctx.connection <- replyBytes } } }