Rework the way we send notifications over the websocket

Redo the datastructures we search so that we only do one lookup per txin and
txout instead of doing a loop per wallet connection.

Don't send spent data on tx notifications, this can be worked out in wallet and
it is expensiveish to calculate. However we DO check upon getting a notification
request if the output is already spent, and in which case we send an immediate
notification to force a rescan.

MinedTxNotfications are handled separately to the connected block messages
largely to enable this to scale rather better.

Tested by jrick (who found one bug i had introduced, thanks!)

Additionally (accidentally squashed in):

Add handlers for all known commands.

We have handlers for all wallet-requiring commands that will return a suitable
error.

Unimplemented commands temporarily return an error stating so.
This commit is contained in:
Owain G. Ainsworth 2013-11-04 18:31:56 +00:00
parent 8c2b9ae06e
commit d8c5222474

View file

@ -7,6 +7,7 @@ package main
import ( import (
"bytes" "bytes"
"code.google.com/p/go.net/websocket" "code.google.com/p/go.net/websocket"
"container/list"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
@ -53,104 +54,171 @@ type rpcServer struct {
// wsContext holds the items the RPC server needs to handle websocket // wsContext holds the items the RPC server needs to handle websocket
// connections for wallets. // connections for wallets.
type wsContext struct { type wsContext struct {
// walletListeners holds a map of each currently connected wallet
// listener as the key. The value is ignored, as this is only used as
// a set. A mutex is used to prevent incorrect multiple access.
walletListeners struct {
sync.RWMutex sync.RWMutex
m map[chan []byte]bool
}
// requests holds all wallet notification requests. // connections holds a map of each currently connected wallet
requests wsRequests // listener as the key.
connections map[chan []byte]*requestContexts
// Any chain notifications meant to be received by every connected // Any chain notifications meant to be received by every connected
// wallet are sent across this channel. // wallet are sent across this channel.
walletNotificationMaster chan []byte walletNotificationMaster chan []byte
// Map of address hash to list of notificationCtx. This is the global
// list we actually use for notifications, we also keep a list in the
// requestContexts to make removal from this list on connection close
// less horrendously expensive.
txNotifications map[string]*list.List
// Map of outpoint to list of notificationCtx.
spentNotifications map[btcwire.OutPoint]*list.List
// Map of shas to list of notificationCtx.
minedTxNotifications map[btcwire.ShaHash]*list.List
} }
// wsRequests maps request contexts for wallet notifications to a type notificationCtx struct {
// wallet notification channel. A Mutex is used to protect incorrect id interface{}
// concurrent access to the map. connection chan []byte
type wsRequests struct { rc *requestContexts
sync.Mutex
m map[chan []byte]*requestContexts
}
// getOrCreateContexts gets the request contexts, or creates and adds a
// new context if one for this wallet is not already present.
func (r *wsRequests) getOrCreateContexts(walletNotification chan []byte) *requestContexts {
rc, ok := r.m[walletNotification]
if !ok {
rc = &requestContexts{
// The key is a stringified addressHash.
txRequests: make(map[string]interface{}),
spentRequests: make(map[btcwire.OutPoint]interface{}),
minedTxRequests: make(map[btcwire.ShaHash]bool),
}
r.m[walletNotification] = rc
}
return rc
} }
// AddTxRequest adds the request context for new transaction notifications. // AddTxRequest adds the request context for new transaction notifications.
func (r *wsRequests) AddTxRequest(walletNotification chan []byte, addrhash string, id interface{}) { func (r *wsContext) AddTxRequest(walletNotification chan []byte, rc *requestContexts, addrhash string, id interface{}) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification) nc := &notificationCtx{
id: id,
connection: walletNotification,
rc: rc,
}
clist, ok := r.txNotifications[addrhash]
if !ok {
clist = list.New()
r.txNotifications[addrhash] = clist
}
clist.PushBack(nc)
rc.txRequests[addrhash] = id rc.txRequests[addrhash] = id
} }
func (r *wsContext) removeGlobalTxRequest(walletNotification chan []byte, addrhash string) {
clist := r.txNotifications[addrhash]
for e := clist.Front(); e != nil; e = e.Next() {
ctx := e.Value.(*notificationCtx)
if ctx.connection == walletNotification {
clist.Remove(e)
break
}
}
if clist.Len() == 0 {
delete(r.txNotifications, addrhash)
}
}
// AddSpentRequest adds a request context for notifications of a spent // AddSpentRequest adds a request context for notifications of a spent
// Outpoint. // Outpoint.
func (r *wsRequests) AddSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint, id interface{}) { func (r *wsContext) AddSpentRequest(walletNotification chan []byte, rc *requestContexts, op *btcwire.OutPoint, id interface{}) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification) nc := &notificationCtx{
id: id,
connection: walletNotification,
rc: rc,
}
clist, ok := r.spentNotifications[*op]
if !ok {
clist = list.New()
r.spentNotifications[*op] = clist
}
clist.PushBack(nc)
rc.spentRequests[*op] = id rc.spentRequests[*op] = id
} }
func (r *wsContext) removeGlobalSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) {
clist := r.spentNotifications[*op]
for e := clist.Front(); e != nil; e = e.Next() {
ctx := e.Value.(*notificationCtx)
if ctx.connection == walletNotification {
clist.Remove(e)
break
}
}
if clist.Len() == 0 {
delete(r.spentNotifications, *op)
}
}
// RemoveSpentRequest removes a request context for notifications of a // RemoveSpentRequest removes a request context for notifications of a
// spent Outpoint. // spent Outpoint.
func (r *wsRequests) RemoveSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) { func (r *wsContext) RemoveSpentRequest(walletNotification chan []byte, rc *requestContexts, op *btcwire.OutPoint) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification) r.removeGlobalSpentRequest(walletNotification, op)
delete(rc.spentRequests, *op) delete(rc.spentRequests, *op)
} }
// AddMinedTxRequest adds request contexts for notifications of a // AddMinedTxRequest adds request contexts for notifications of a
// mined transaction. // mined transaction.
func (r *wsRequests) AddMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { func (r *wsContext) AddMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification) rc := r.connections[walletNotification]
nc := &notificationCtx{
connection: walletNotification,
rc: rc,
}
clist, ok := r.minedTxNotifications[*txID]
if !ok {
clist = list.New()
r.minedTxNotifications[*txID] = clist
}
clist.PushBack(nc)
rc.minedTxRequests[*txID] = true rc.minedTxRequests[*txID] = true
} }
func (r *wsContext) removeGlobalMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) {
clist := r.minedTxNotifications[*txID]
for e := clist.Front(); e != nil; e = e.Next() {
ctx := e.Value.(*notificationCtx)
if ctx.connection == walletNotification {
clist.Remove(e)
break
}
}
if clist.Len() == 0 {
delete(r.minedTxNotifications, *txID)
}
}
// RemoveMinedTxRequest removes request contexts for notifications of a // RemoveMinedTxRequest removes request contexts for notifications of a
// mined transaction. // mined transaction.
func (r *wsRequests) RemoveMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { func (r *wsContext) RemoveMinedTxRequest(walletNotification chan []byte, rc *requestContexts, txID *btcwire.ShaHash) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification) r.removeGlobalMinedTxRequest(walletNotification, txID)
delete(rc.minedTxRequests, *txID) delete(rc.minedTxRequests, *txID)
} }
// CloseListeners removes all request contexts for notifications sent // CloseListeners removes all request contexts for notifications sent
// to a wallet notification channel and closes the channel to stop all // to a wallet notification channel and closes the channel to stop all
// goroutines currently serving that wallet. // goroutines currently serving that wallet.
func (r *wsRequests) CloseListeners(walletNotification chan []byte) { func (r *wsContext) CloseListeners(walletNotification chan []byte) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
delete(r.m, walletNotification) delete(r.connections, walletNotification)
close(walletNotification) close(walletNotification)
} }
@ -244,9 +312,11 @@ func newRPCServer(s *server) (*rpcServer, error) {
rpc.password = cfg.RPCPass rpc.password = cfg.RPCPass
// initialize memory for websocket connections // initialize memory for websocket connections
rpc.ws.requests.m = make(map[chan []byte]*requestContexts) rpc.ws.connections = make(map[chan []byte]*requestContexts)
rpc.ws.walletListeners.m = make(map[chan []byte]bool)
rpc.ws.walletNotificationMaster = make(chan []byte) rpc.ws.walletNotificationMaster = make(chan []byte)
rpc.ws.txNotifications = make(map[string]*list.List)
rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List)
rpc.ws.minedTxNotifications = make(map[btcwire.ShaHash]*list.List)
// IPv4 listener. // IPv4 listener.
var listeners []net.Listener var listeners []net.Listener
@ -309,25 +379,80 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
type commandHandler func(*rpcServer, btcjson.Cmd, chan []byte) (interface{}, error) type commandHandler func(*rpcServer, btcjson.Cmd, chan []byte) (interface{}, error)
var handlers = map[string]commandHandler{ var handlers = map[string]commandHandler{
"addmultisigaddress": handleAskWallet,
"addnode": handleAddNode, "addnode": handleAddNode,
"backupwallet": handleAskWallet,
"createmultisig": handleAskWallet,
"createrawtransaction": handleUnimplemented,
"decoderawtransaction": handleDecodeRawTransaction, "decoderawtransaction": handleDecodeRawTransaction,
"decodescript": handleUnimplemented,
"dumpprivkey": handleAskWallet,
"dumpwallet": handleAskWallet,
"encryptwallet": handleAskWallet,
"getaccount": handleAskWallet,
"getaccountaddress": handleAskWallet,
"getaddednodeinfo": handleUnimplemented,
"getaddressesbyaccount": handleAskWallet,
"getbalance": handleAskWallet,
"getbestblockhash": handleGetBestBlockHash, "getbestblockhash": handleGetBestBlockHash,
"getblock": handleGetBlock, "getblock": handleGetBlock,
"getblockcount": handleGetBlockCount, "getblockcount": handleGetBlockCount,
"getblockhash": handleGetBlockHash, "getblockhash": handleGetBlockHash,
"getblocktemplate": handleUnimplemented,
"getconnectioncount": handleGetConnectionCount, "getconnectioncount": handleGetConnectionCount,
"getdifficulty": handleGetDifficulty, "getdifficulty": handleGetDifficulty,
"getgenerate": handleGetGenerate, "getgenerate": handleGetGenerate,
"gethashespersec": handleGetHashesPerSec, "gethashespersec": handleGetHashesPerSec,
"getinfo": handleUnimplemented,
"getmininginfo": handleUnimplemented,
"getnettotals": handleUnimplemented,
"getnetworkhashps": handleUnimplemented,
"getnewaddress": handleUnimplemented,
"getpeerinfo": handleGetPeerInfo, "getpeerinfo": handleGetPeerInfo,
"getrawchangeaddress": handleAskWallet,
"getrawmempool": handleGetRawMempool, "getrawmempool": handleGetRawMempool,
"getrawtransaction": handleGetRawTransaction, "getrawtransaction": handleGetRawTransaction,
"getreceivedbyaccount": handleAskWallet,
"getreceivedbyaddress": handleAskWallet,
"gettransaction": handleAskWallet,
"gettxout": handleAskWallet,
"gettxoutsetinfo": handleAskWallet,
"getwork": handleUnimplemented,
"help": handleUnimplemented,
"importprivkey": handleAskWallet,
"importwallet": handleAskWallet,
"keypoolrefill": handleAskWallet,
"listaccounts": handleAskWallet,
"listaddressgroupings": handleAskWallet,
"listlockunspent": handleAskWallet,
"listreceivedbyaccount": handleAskWallet,
"listreceivedbyaddress": handleAskWallet,
"listsinceblock": handleAskWallet,
"listtransactions": handleAskWallet,
"listunspent": handleAskWallet,
"lockunspent": handleAskWallet,
"move": handleAskWallet,
"ping": handleUnimplemented,
"sendfrom": handleAskWallet,
"sendmany": handleAskWallet,
"sendrawtransaction": handleSendRawTransaction, "sendrawtransaction": handleSendRawTransaction,
"sendtoaddress": handleAskWallet,
"setaccount": handleAskWallet,
"setgenerate": handleSetGenerate, "setgenerate": handleSetGenerate,
"settxfee": handleAskWallet,
"signmessage": handleAskWallet,
"signrawtransaction": handleAskWallet,
"stop": handleStop, "stop": handleStop,
"submitblock": handleUnimplemented,
"validateaddress": handleAskWallet,
"verifychain": handleUnimplemented,
"verifymessage": handleAskWallet,
"walletlock": handleAskWallet,
"walletpassphrase": handleAskWallet,
"walletpassphrasechange": handleAskWallet,
} }
type wsCommandHandler func(*rpcServer, btcjson.Cmd, chan []byte) error type wsCommandHandler func(*rpcServer, btcjson.Cmd, chan []byte, *requestContexts) error
var wsHandlers = map[string]wsCommandHandler{ var wsHandlers = map[string]wsCommandHandler{
"getcurrentnet": handleGetCurrentNet, "getcurrentnet": handleGetCurrentNet,
@ -337,6 +462,21 @@ var wsHandlers = map[string]wsCommandHandler{
"notifyspent": handleNotifySpent, "notifyspent": handleNotifySpent,
} }
// handleUnimplemented is a temporary handler for commands that we should
// support but do not.
func handleUnimplemented(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) (interface{}, error) {
return nil, btcjson.ErrUnimplemented
}
// handleAskWallet is the handler for commands that we do recognise as valid
// but that we can not answer correctly since it involves wallet state.
// These commands will be implemented in btcwallet.
func handleAskWallet(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) (interface{}, error) {
return nil, btcjson.ErrNoWallet
}
// handleDecodeRawTransaction handles decoderawtransaction commands. // handleDecodeRawTransaction handles decoderawtransaction commands.
func handleAddNode(s *rpcServer, cmd btcjson.Cmd, func handleAddNode(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) (interface{}, error) { walletNotification chan []byte) (interface{}, error) {
@ -640,7 +780,7 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, walletNotification
// If called from websocket code, add a mined tx hashes // If called from websocket code, add a mined tx hashes
// request. // request.
if walletNotification != nil { if walletNotification != nil {
s.ws.requests.AddMinedTxRequest(walletNotification, tx.Sha()) s.ws.AddMinedTxRequest(walletNotification, tx.Sha())
} }
return tx.Sha().String(), nil return tx.Sha().String(), nil
@ -738,7 +878,7 @@ func jsonRead(body []byte, s *rpcServer, walletNotification chan []byte) (reply
// handleGetCurrentNet implements the getcurrentnet command extension // handleGetCurrentNet implements the getcurrentnet command extension
// for websocket connections. // for websocket connections.
func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) error { walletNotification chan []byte, rc *requestContexts) error {
id := cmd.Id() id := cmd.Id()
reply := &btcjson.Reply{Id: &id} reply := &btcjson.Reply{Id: &id}
@ -759,7 +899,7 @@ func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd,
// handleGetBestBlock implements the getbestblock command extension // handleGetBestBlock implements the getbestblock command extension
// for websocket connections. // for websocket connections.
func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) error { walletNotification chan []byte, rc *requestContexts) error {
id := cmd.Id() id := cmd.Id()
reply := &btcjson.Reply{Id: &id} reply := &btcjson.Reply{Id: &id}
@ -784,7 +924,7 @@ func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd,
// handleRescan implements the rescan command extension for websocket // handleRescan implements the rescan command extension for websocket
// connections. // connections.
func handleRescan(s *rpcServer, cmd btcjson.Cmd, func handleRescan(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) error { walletNotification chan []byte, rc *requestContexts) error {
id := cmd.Id() id := cmd.Id()
reply := &btcjson.Reply{Id: &id} reply := &btcjson.Reply{Id: &id}
@ -881,7 +1021,7 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd,
// handleNotifyNewTXs implements the notifynewtxs command extension for // handleNotifyNewTXs implements the notifynewtxs command extension for
// websocket connections. // websocket connections.
func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd, func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) error { walletNotification chan []byte, rc *requestContexts) error {
id := cmd.Id() id := cmd.Id()
reply := &btcjson.Reply{Id: &id} reply := &btcjson.Reply{Id: &id}
@ -896,8 +1036,8 @@ func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd,
if err != nil { if err != nil {
return fmt.Errorf("cannot decode address: %v", err) return fmt.Errorf("cannot decode address: %v", err)
} }
s.ws.requests.AddTxRequest(walletNotification, s.ws.AddTxRequest(walletNotification, rc, string(hash),
string(hash[:]), id) cmd.Id())
} }
mreply, _ := json.Marshal(reply) mreply, _ := json.Marshal(reply)
@ -908,7 +1048,7 @@ func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd,
// handleNotifySpent implements the notifyspent command extension for // handleNotifySpent implements the notifyspent command extension for
// websocket connections. // websocket connections.
func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd, func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte) error { walletNotification chan []byte, rc *requestContexts) error {
id := cmd.Id() id := cmd.Id()
reply := &btcjson.Reply{Id: &id} reply := &btcjson.Reply{Id: &id}
@ -918,14 +1058,17 @@ func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd,
return btcjson.ErrInternal return btcjson.ErrInternal
} }
s.ws.requests.AddSpentRequest(walletNotification, notifyCmd.OutPoint, id) s.ws.AddSpentRequest(walletNotification, rc, notifyCmd.OutPoint,
cmd.Id())
mreply, _ := json.Marshal(reply) mreply, _ := json.Marshal(reply)
walletNotification <- mreply walletNotification <- mreply
return nil return nil
} }
func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte) error { func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte,
rc *requestContexts) error {
var reply btcjson.Reply var reply btcjson.Reply
cmd, err := btcjson.ParseMarshaledCmd(body) cmd, err := btcjson.ParseMarshaledCmd(body)
@ -961,7 +1104,7 @@ func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte) error
return btcjson.ErrMethodNotFound return btcjson.ErrMethodNotFound
} }
if err := wsHandler(s, cmd, walletNotification); err != nil { if err := wsHandler(s, cmd, walletNotification, rc); err != nil {
jsonErr, ok := err.(btcjson.Error) jsonErr, ok := err.(btcjson.Error)
if ok { if ok {
reply.Error = &jsonErr reply.Error = &jsonErr
@ -1007,17 +1150,37 @@ func getDifficultyRatio(bits uint32) float64 {
// AddWalletListener adds a channel to listen for new messages from a // AddWalletListener adds a channel to listen for new messages from a
// wallet. // wallet.
func (s *rpcServer) AddWalletListener(c chan []byte) { func (s *rpcServer) AddWalletListener(c chan []byte) *requestContexts {
s.ws.walletListeners.Lock() s.ws.Lock()
s.ws.walletListeners.m[c] = true rc := &requestContexts{
s.ws.walletListeners.Unlock() // The key is a stringified addressHash.
txRequests: make(map[string]interface{}),
spentRequests: make(map[btcwire.OutPoint]interface{}),
minedTxRequests: make(map[btcwire.ShaHash]bool),
}
s.ws.connections[c] = rc
s.ws.Unlock()
return rc
} }
// RemoveWalletListener removes a wallet listener channel. // RemoveWalletListener removes a wallet listener channel.
func (s *rpcServer) RemoveWalletListener(c chan []byte) { func (s *rpcServer) RemoveWalletListener(c chan []byte, rc *requestContexts) {
s.ws.walletListeners.Lock() s.ws.Lock()
delete(s.ws.walletListeners.m, c)
s.ws.walletListeners.Unlock() for k := range rc.txRequests {
s.ws.removeGlobalTxRequest(c, k)
}
for k := range rc.spentRequests {
s.ws.removeGlobalSpentRequest(c, &k)
}
for k := range rc.minedTxRequests {
s.ws.removeGlobalMinedTxRequest(c, &k)
}
delete(s.ws.connections, c)
s.ws.Unlock()
} }
// walletListenerDuplicator listens for new wallet listener channels // walletListenerDuplicator listens for new wallet listener channels
@ -1029,11 +1192,11 @@ func (s *rpcServer) walletListenerDuplicator() {
for { for {
select { select {
case ntfn := <-s.ws.walletNotificationMaster: case ntfn := <-s.ws.walletNotificationMaster:
s.ws.walletListeners.RLock() s.ws.RLock()
for c := range s.ws.walletListeners.m { for c := range s.ws.connections {
c <- ntfn c <- ntfn
} }
s.ws.walletListeners.RUnlock() s.ws.RUnlock()
case <-s.quit: case <-s.quit:
return return
@ -1048,8 +1211,8 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// Add wallet notification channel so this handler receives btcd chain // Add wallet notification channel so this handler receives btcd chain
// notifications. // notifications.
c := make(chan []byte) c := make(chan []byte)
s.AddWalletListener(c) rc := s.AddWalletListener(c)
defer s.RemoveWalletListener(c) defer s.RemoveWalletListener(c, rc)
// msgs is a channel for all messages received over the websocket. // msgs is a channel for all messages received over the websocket.
msgs := make(chan []byte) msgs := make(chan []byte)
@ -1081,7 +1244,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
return return
} }
// Handle request here. // Handle request here.
go s.websocketJSONHandler(c, m) go s.websocketJSONHandler(c, rc, m)
case ntfn, _ := <-c: case ntfn, _ := <-c:
// Send btcd notification to btcwallet instance over // Send btcd notification to btcwallet instance over
// websocket. // websocket.
@ -1098,7 +1261,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// websocketJSONHandler parses and handles a marshalled json message, // websocketJSONHandler parses and handles a marshalled json message,
// sending the marshalled reply to a wallet notification channel. // sending the marshalled reply to a wallet notification channel.
func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) { func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, rc *requestContexts, msg []byte) {
s.wg.Add(1) s.wg.Add(1)
reply, err := jsonRead(msg, s, walletNotification) reply, err := jsonRead(msg, s, walletNotification)
s.wg.Done() s.wg.Done()
@ -1114,7 +1277,7 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b
// Try websocket extensions // Try websocket extensions
s.wg.Add(1) s.wg.Add(1)
err = jsonWSRead(msg, s, walletNotification) err = jsonWSRead(msg, s, walletNotification, rc)
s.wg.Done() s.wg.Done()
} }
@ -1122,57 +1285,51 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b
// of a new block connected to the main chain. The notification is sent // of a new block connected to the main chain. The notification is sent
// to each connected wallet. // to each connected wallet.
func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) { func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
s.ws.walletListeners.RLock() s.ws.RLock()
for wltNtfn := range s.ws.walletListeners.m {
// Create notification with basic information filled in.
// This data is the same for every connected wallet.
hash, err := block.Sha() hash, err := block.Sha()
if err != nil { if err != nil {
log.Error("Bad block; connected block notification dropped.") log.Error("Bad block; connected block notification dropped.")
return return
} }
ntfnResult := struct {
var id interface{} = "btcd:blockconnected"
ntfn := btcjson.Reply{
Result: struct {
Hash string `json:"hash"` Hash string `json:"hash"`
Height int64 `json:"height"` Height int64 `json:"height"`
MinedTXs []string `json:"minedtxs"` MinedTXs []string `json:"minedtxs"`
}{ }{
Hash: hash.String(), Hash: hash.String(),
Height: block.Height(), Height: block.Height(),
} },
// Fill in additional wallet-specific notifications. If there
// is no request context for this wallet, no need to give this
// wallet any extra notifications.
if cxt := s.ws.requests.m[wltNtfn]; cxt != nil {
// Create list of all txs created by this wallet that appear in this
// block.
minedTxShas := make([]string, 0, len(cxt.minedTxRequests))
// TxShas does not return a non-nil error.
txShaList, _ := block.TxShas()
txList := s.server.db.FetchTxByShaList(txShaList)
for _, txReply := range txList {
if txReply.Err != nil {
continue
}
if _, ok := cxt.minedTxRequests[*txReply.Sha]; ok {
minedTxShas = append(minedTxShas, txReply.Sha.String())
s.ws.requests.RemoveMinedTxRequest(wltNtfn, txReply.Sha)
}
}
ntfnResult.MinedTXs = minedTxShas
}
var id interface{} = "btcd:blockconnected"
ntfn := btcjson.Reply{
Result: ntfnResult,
Id: &id, Id: &id,
} }
m, _ := json.Marshal(ntfn) m, _ := json.Marshal(ntfn)
wltNtfn <- m s.ws.walletNotificationMaster <- m
// Inform any interested parties about txs mined in this block.
for _, tx := range block.Transactions() {
if clist, ok := s.ws.minedTxNotifications[*tx.Sha()]; ok {
for e := clist.Front(); e != nil; e = e.Next() {
ctx := e.Value.(*notificationCtx)
var id interface{} = "btcd:txmined"
reply := btcjson.Reply{
Result: tx.Sha().String(),
Id: &id,
} }
s.ws.walletListeners.RUnlock() replyBytes, err := json.Marshal(reply)
if err != nil {
log.Errorf("RPCS: Unable to marshal mined tx notification: %v", err)
continue
}
ctx.connection <- replyBytes
s.ws.RemoveMinedTxRequest(ctx.connection, ctx.rc,
tx.Sha())
}
}
}
s.ws.RUnlock()
} }
// NotifyBlockDisconnected creates and marshals a JSON message to notify // NotifyBlockDisconnected creates and marshals a JSON message to notify
@ -1203,66 +1360,59 @@ func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) {
// of new transactions (with both spent and unspent outputs) for a watched // of new transactions (with both spent and unspent outputs) for a watched
// address. // address.
func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) { func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) {
// Build a map of in-flight transactions to see if any of the inputs in for _, tx := range block.Transactions() {
// this block are referencing other transactions earlier in this block. s.newBlockNotifyCheckTxIn(tx)
txInFlight := map[btcwire.ShaHash]int{} s.newBlockNotifyCheckTxOut(block, tx)
transactions := block.Transactions()
spent := make([][]bool, len(transactions))
for i, tx := range transactions {
spent[i] = make([]bool, len(tx.MsgTx().TxOut))
txInFlight[*tx.Sha()] = i
}
// The newBlockNotifyCheckTxOut current needs spent data. This can
// this can ultimately be optimized out by making sure the notifications
// are in order. For now, just create the spent data.
for i, tx := range transactions[1:] {
for _, txIn := range tx.MsgTx().TxIn {
originHash := &txIn.PreviousOutpoint.Hash
if inFlightIndex, ok := txInFlight[*originHash]; ok &&
i >= inFlightIndex {
prevIndex := txIn.PreviousOutpoint.Index
spent[inFlightIndex][prevIndex] = true
}
}
}
for i, tx := range transactions {
go s.newBlockNotifyCheckTxIn(tx)
go s.newBlockNotifyCheckTxOut(block, tx, spent[i])
} }
} }
// newBlockNotifyCheckTxIn is a helper function to iterate through func notifySpentData(ctx *notificationCtx, txhash *btcwire.ShaHash, index uint32,
// each transaction input of a new block and perform any checks and spender *btcutil.Tx) {
// notify listening frontends when necessary. txStr := ""
func (s *rpcServer) newBlockNotifyCheckTxIn(tx *btcutil.Tx) { if spender != nil {
for wltNtfn, cxt := range s.ws.requests.m { var buf bytes.Buffer
for _, txin := range tx.MsgTx().TxIn { err := spender.MsgTx().Serialize(&buf)
for op, id := range cxt.spentRequests { if err != nil {
if txin.PreviousOutpoint != op { // This really shouldn't ever happen...
continue log.Warnf("RPCS: Can't serialize tx: %v", err)
return
}
txStr = string(buf.Bytes())
} }
reply := &btcjson.Reply{ reply := &btcjson.Reply{
Result: struct { Result: struct {
TxHash string `json:"txhash"` TxHash string `json:"txhash"`
Index uint32 `json:"index"` Index uint32 `json:"index"`
SpendingTx string `json:"spender,omitempty"`
}{ }{
TxHash: op.Hash.String(), TxHash: txhash.String(),
Index: uint32(op.Index), Index: index,
SpendingTx: txStr,
}, },
Error: nil, Error: nil,
Id: &id, Id: &ctx.id,
} }
replyBytes, err := json.Marshal(reply) replyBytes, err := json.Marshal(reply)
if err != nil { if err != nil {
log.Errorf("RPCS: Unable to marshal spent notification: %v", err) log.Errorf("RPCS: Unable to marshal spent notification: %v", err)
continue return
} }
wltNtfn <- replyBytes ctx.connection <- replyBytes
s.ws.requests.RemoveSpentRequest(wltNtfn, &op) }
// newBlockNotifyCheckTxIn is a helper function to iterate through
// each transaction input of a new block and perform any checks and
// notify listening frontends when necessary.
func (s *rpcServer) newBlockNotifyCheckTxIn(tx *btcutil.Tx) {
for _, txin := range tx.MsgTx().TxIn {
if clist, ok := s.ws.spentNotifications[txin.PreviousOutpoint]; ok {
for e := clist.Front(); e != nil; e = e.Next() {
ctx := e.Value.(*notificationCtx)
notifySpentData(ctx, &txin.PreviousOutpoint.Hash,
uint32(txin.PreviousOutpoint.Index), tx)
s.ws.RemoveSpentRequest(ctx.connection, ctx.rc,
&txin.PreviousOutpoint)
} }
} }
} }
@ -1271,15 +1421,18 @@ func (s *rpcServer) newBlockNotifyCheckTxIn(tx *btcutil.Tx) {
// newBlockNotifyCheckTxOut is a helper function to iterate through // newBlockNotifyCheckTxOut is a helper function to iterate through
// each transaction output of a new block and perform any checks and // each transaction output of a new block and perform any checks and
// notify listening frontends when necessary. // notify listening frontends when necessary.
func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.Tx, spent []bool) { func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.Tx) {
for wltNtfn, cxt := range s.ws.requests.m {
for i, txout := range tx.MsgTx().TxOut { for i, txout := range tx.MsgTx().TxOut {
_, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript) _, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript)
if err != nil { if err != nil {
log.Debug("Error getting payment address from tx; dropping any Tx notifications.") log.Debug("Error getting payment address from tx; dropping any Tx notifications.")
break break
} }
if id, ok := cxt.txRequests[string(txaddrhash)]; ok { if idlist, ok := s.ws.txNotifications[string(txaddrhash)]; ok {
for e := idlist.Front(); e != nil; e = e.Next() {
ctx := e.Value.(*notificationCtx)
blkhash, err := block.Sha() blkhash, err := block.Sha()
if err != nil { if err != nil {
log.Error("Error getting block sha; dropping Tx notification.") log.Error("Error getting block sha; dropping Tx notification.")
@ -1300,7 +1453,6 @@ func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.T
Index uint32 `json:"index"` Index uint32 `json:"index"`
Amount int64 `json:"amount"` Amount int64 `json:"amount"`
PkScript string `json:"pkscript"` PkScript string `json:"pkscript"`
Spent bool `json:"spent"`
}{ }{
Sender: "Unknown", // TODO(jrick) Sender: "Unknown", // TODO(jrick)
Receiver: txaddr, Receiver: txaddr,
@ -1310,17 +1462,16 @@ func (s *rpcServer) newBlockNotifyCheckTxOut(block *btcutil.Block, tx *btcutil.T
Index: uint32(i), Index: uint32(i),
Amount: txout.Value, Amount: txout.Value,
PkScript: btcutil.Base58Encode(txout.PkScript), PkScript: btcutil.Base58Encode(txout.PkScript),
Spent: spent[i],
}, },
Error: nil, Error: nil,
Id: &id, Id: &ctx.id,
} }
replyBytes, err := json.Marshal(reply) replyBytes, err := json.Marshal(reply)
if err != nil { if err != nil {
log.Errorf("RPCS: Unable to marshal tx notification: %v", err) log.Errorf("RPCS: Unable to marshal tx notification: %v", err)
continue continue
} }
wltNtfn <- replyBytes ctx.connection <- replyBytes
} }
} }
} }