Change websocket handlers to return results.

This is the first step to fixing #66.
This commit is contained in:
Josh Rickmar 2014-01-14 13:15:22 -05:00
parent 6222b1d8cc
commit 6abad1d8ac
2 changed files with 172 additions and 186 deletions

View file

@ -244,8 +244,8 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) {
} }
// initialize memory for websocket connections // initialize memory for websocket connections
rpc.ws.connections = make(map[walletChan]*requestContexts) rpc.ws.connections = make(map[ntfnChan]*requestContexts)
rpc.ws.walletNotificationMaster = make(chan []byte) rpc.ws.walletNotificationMaster = make(ntfnChan)
rpc.ws.txNotifications = make(map[string]*list.List) rpc.ws.txNotifications = make(map[string]*list.List)
rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List) rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List)
rpc.ws.minedTxNotifications = make(map[btcwire.ShaHash]*list.List) rpc.ws.minedTxNotifications = make(map[btcwire.ShaHash]*list.List)

View file

@ -20,9 +20,9 @@ import (
"sync" "sync"
) )
type walletChan chan []byte type ntfnChan chan btcjson.Cmd
type wsCommandHandler func(*rpcServer, btcjson.Cmd, walletChan) error type wsCommandHandler func(*rpcServer, btcjson.Cmd, ntfnChan) (interface{}, *btcjson.Error)
// wsHandlers maps RPC command strings to appropriate websocket handler // wsHandlers maps RPC command strings to appropriate websocket handler
// functions. // functions.
@ -42,11 +42,11 @@ type wsContext struct {
// connections holds a map of requests for each wallet using the // connections holds a map of requests for each wallet using the
// wallet channel as the key. // wallet channel as the key.
connections map[walletChan]*requestContexts connections map[ntfnChan]*requestContexts
// Any chain notifications meant to be received by every connected // Any chain notifications meant to be received by every connected
// wallet are sent across this channel. // wallet are sent across this channel.
walletNotificationMaster walletChan walletNotificationMaster ntfnChan
// Map of address hash to list of notificationCtx. This is the global // Map of address hash to list of notificationCtx. This is the global
// list we actually use for notifications, we also keep a list in the // list we actually use for notifications, we also keep a list in the
@ -62,7 +62,7 @@ type wsContext struct {
} }
// AddTxRequest adds the request context for new transaction notifications. // AddTxRequest adds the request context for new transaction notifications.
func (r *wsContext) AddTxRequest(wallet walletChan, addr string) { func (r *wsContext) AddTxRequest(n ntfnChan, addr string) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -72,19 +72,19 @@ func (r *wsContext) AddTxRequest(wallet walletChan, addr string) {
r.txNotifications[addr] = clist r.txNotifications[addr] = clist
} }
clist.PushBack(wallet) clist.PushBack(n)
rc := r.connections[wallet] rc := r.connections[n]
rc.txRequests[addr] = struct{}{} rc.txRequests[addr] = struct{}{}
} }
func (r *wsContext) removeGlobalTxRequest(wallet walletChan, addr string) { func (r *wsContext) removeGlobalTxRequest(n ntfnChan, addr string) {
clist := r.txNotifications[addr] clist := r.txNotifications[addr]
var enext *list.Element var enext *list.Element
for e := clist.Front(); e != nil; e = enext { for e := clist.Front(); e != nil; e = enext {
enext = e.Next() enext = e.Next()
c := e.Value.(walletChan) c := e.Value.(ntfnChan)
if c == wallet { if c == n {
clist.Remove(e) clist.Remove(e)
break break
} }
@ -97,7 +97,7 @@ func (r *wsContext) removeGlobalTxRequest(wallet walletChan, addr string) {
// AddSpentRequest adds a request context for notifications of a spent // AddSpentRequest adds a request context for notifications of a spent
// Outpoint. // Outpoint.
func (r *wsContext) AddSpentRequest(wallet walletChan, op *btcwire.OutPoint) { func (r *wsContext) AddSpentRequest(n ntfnChan, op *btcwire.OutPoint) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -106,19 +106,19 @@ func (r *wsContext) AddSpentRequest(wallet walletChan, op *btcwire.OutPoint) {
clist = list.New() clist = list.New()
r.spentNotifications[*op] = clist r.spentNotifications[*op] = clist
} }
clist.PushBack(wallet) clist.PushBack(n)
rc := r.connections[wallet] rc := r.connections[n]
rc.spentRequests[*op] = struct{}{} rc.spentRequests[*op] = struct{}{}
} }
func (r *wsContext) removeGlobalSpentRequest(wallet walletChan, op *btcwire.OutPoint) { func (r *wsContext) removeGlobalSpentRequest(n ntfnChan, op *btcwire.OutPoint) {
clist := r.spentNotifications[*op] clist := r.spentNotifications[*op]
var enext *list.Element var enext *list.Element
for e := clist.Front(); e != nil; e = enext { for e := clist.Front(); e != nil; e = enext {
enext = e.Next() enext = e.Next()
c := e.Value.(walletChan) c := e.Value.(ntfnChan)
if c == wallet { if c == n {
clist.Remove(e) clist.Remove(e)
break break
} }
@ -131,18 +131,18 @@ func (r *wsContext) removeGlobalSpentRequest(wallet walletChan, op *btcwire.OutP
// RemoveSpentRequest removes a request context for notifications of a // RemoveSpentRequest removes a request context for notifications of a
// spent Outpoint. // spent Outpoint.
func (r *wsContext) RemoveSpentRequest(wallet walletChan, op *btcwire.OutPoint) { func (r *wsContext) RemoveSpentRequest(n ntfnChan, op *btcwire.OutPoint) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.removeGlobalSpentRequest(wallet, op) r.removeGlobalSpentRequest(n, op)
rc := r.connections[wallet] rc := r.connections[n]
delete(rc.spentRequests, *op) delete(rc.spentRequests, *op)
} }
// AddMinedTxRequest adds request contexts for notifications of a // AddMinedTxRequest adds request contexts for notifications of a
// mined transaction. // mined transaction.
func (r *wsContext) AddMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) { func (r *wsContext) AddMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -151,19 +151,19 @@ func (r *wsContext) AddMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash)
clist = list.New() clist = list.New()
r.minedTxNotifications[*txID] = clist r.minedTxNotifications[*txID] = clist
} }
clist.PushBack(wallet) clist.PushBack(n)
rc := r.connections[wallet] rc := r.connections[n]
rc.minedTxRequests[*txID] = struct{}{} rc.minedTxRequests[*txID] = struct{}{}
} }
func (r *wsContext) removeGlobalMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) { func (r *wsContext) removeGlobalMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) {
clist := r.minedTxNotifications[*txID] clist := r.minedTxNotifications[*txID]
var enext *list.Element var enext *list.Element
for e := clist.Front(); e != nil; e = enext { for e := clist.Front(); e != nil; e = enext {
enext = e.Next() enext = e.Next()
c := e.Value.(walletChan) c := e.Value.(ntfnChan)
if c == wallet { if c == n {
clist.Remove(e) clist.Remove(e)
break break
} }
@ -176,30 +176,30 @@ func (r *wsContext) removeGlobalMinedTxRequest(wallet walletChan, txID *btcwire.
// RemoveMinedTxRequest removes request contexts for notifications of a // RemoveMinedTxRequest removes request contexts for notifications of a
// mined transaction. // mined transaction.
func (r *wsContext) RemoveMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) { func (r *wsContext) RemoveMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.removeMinedTxRequest(wallet, txID) r.removeMinedTxRequest(n, txID)
} }
// removeMinedTxRequest removes request contexts for notifications of a // removeMinedTxRequest removes request contexts for notifications of a
// mined transaction without grabbing any locks. // mined transaction without grabbing any locks.
func (r *wsContext) removeMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) { func (r *wsContext) removeMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) {
r.removeGlobalMinedTxRequest(wallet, txID) r.removeGlobalMinedTxRequest(n, txID)
rc := r.connections[wallet] rc := r.connections[n]
delete(rc.minedTxRequests, *txID) delete(rc.minedTxRequests, *txID)
} }
// CloseListeners removes all request contexts for notifications sent // CloseListeners removes all request contexts for notifications sent
// to a wallet notification channel and closes the channel to stop all // to a wallet notification channel and closes the channel to stop all
// goroutines currently serving that wallet. // goroutines currently serving that wallet.
func (r *wsContext) CloseListeners(wallet walletChan) { func (r *wsContext) CloseListeners(n ntfnChan) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
delete(r.connections, wallet) delete(r.connections, n)
close(wallet) close(n)
} }
// requestContexts holds all requests for a single wallet connection. // requestContexts holds all requests for a single wallet connection.
@ -226,158 +226,126 @@ type requestContexts struct {
// extension JSON-RPC command and runs the proper handler to reply to // extension JSON-RPC command and runs the proper handler to reply to
// the command. Any and all responses are sent to the wallet from // the command. Any and all responses are sent to the wallet from
// this function. // this function.
func respondToAnyCmd(cmd btcjson.Cmd, s *rpcServer, wallet walletChan) { func respondToAnyCmd(cmd btcjson.Cmd, s *rpcServer, n ntfnChan) *btcjson.Reply {
// Lookup the websocket extension for the command and if it doesn't // Lookup the websocket extension for the command and if it doesn't
// exist fallback to handling the command as a standard command. // exist fallback to handling the command as a standard command.
wsHandler, ok := wsHandlers[cmd.Method()] wsHandler, ok := wsHandlers[cmd.Method()]
if !ok { if !ok {
reply := standardCmdReply(cmd, s) // No websocket-specific handler so handle like a legacy
mreply, _ := json.Marshal(reply) // RPC connection.
wallet <- mreply response := standardCmdReply(cmd, s)
return return &response
} }
result, jsonErr := wsHandler(s, cmd, n)
// Call the appropriate handler which responds unless there was an id := cmd.Id()
// error in which case the error is marshalled and sent here. response := btcjson.Reply{
if err := wsHandler(s, cmd, wallet); err != nil { Id: &id,
var reply btcjson.Reply Result: result,
jsonErr, ok := err.(btcjson.Error) Error: jsonErr,
if ok {
reply.Error = &jsonErr
mreply, _ := json.Marshal(reply)
wallet <- mreply
return
}
// In the case where we did not have a btcjson
// error to begin with, make a new one to send,
// but this really should not happen.
jsonErr = btcjson.Error{
Code: btcjson.ErrInternal.Code,
Message: err.Error(),
}
reply.Error = &jsonErr
mreply, _ := json.Marshal(reply)
wallet <- mreply
} }
return &response
} }
// handleGetCurrentNet implements the getcurrentnet command extension // handleGetCurrentNet implements the getcurrentnet command extension
// for websocket connections. // for websocket connections.
func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error { func handleGetCurrentNet(s *rpcServer, icmd btcjson.Cmd, ntfns ntfnChan) (interface{}, *btcjson.Error) {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
var net btcwire.BitcoinNet
if cfg.TestNet3 { if cfg.TestNet3 {
net = btcwire.TestNet3 return btcwire.TestNet3, nil
} else {
net = btcwire.MainNet
} }
return btcwire.MainNet, nil
reply.Result = float64(net)
mreply, _ := json.Marshal(reply)
wallet <- mreply
return nil
} }
// handleGetBestBlock implements the getbestblock command extension // handleGetBestBlock implements the getbestblock command extension
// for websocket connections. // for websocket connections.
func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error { func handleGetBestBlock(s *rpcServer, icmd btcjson.Cmd, ntfns ntfnChan) (interface{}, *btcjson.Error) {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
// All other "get block" commands give either the height, the // All other "get block" commands give either the height, the
// hash, or both but require the block SHA. This gets both for // hash, or both but require the block SHA. This gets both for
// the best block. // the best block.
sha, height, err := s.server.db.NewestSha() sha, height, err := s.server.db.NewestSha()
if err != nil { if err != nil {
return btcjson.ErrBestBlockHash return nil, &btcjson.ErrBestBlockHash
} }
reply.Result = map[string]interface{}{ // TODO(jrick): need a btcws type for the result.
result := map[string]interface{}{
"hash": sha.String(), "hash": sha.String(),
"height": height, "height": height,
} }
mreply, _ := json.Marshal(reply) return result, nil
wallet <- mreply
return nil
} }
// handleNotifyNewTXs implements the notifynewtxs command extension for // handleNotifyNewTXs implements the notifynewtxs command extension for
// websocket connections. // websocket connections.
func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error { func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, ntfns ntfnChan) (interface{}, *btcjson.Error) {
id := cmd.Id() cmd, ok := icmd.(*btcws.NotifyNewTXsCmd)
reply := &btcjson.Reply{Id: &id}
notifyCmd, ok := cmd.(*btcws.NotifyNewTXsCmd)
if !ok { if !ok {
return btcjson.ErrInternal return nil, &btcjson.ErrInternal
} }
for _, addr := range notifyCmd.Addresses { for _, addrStr := range cmd.Addresses {
addr, err := btcutil.DecodeAddr(addr) addr, err := btcutil.DecodeAddr(addrStr)
if err != nil { if err != nil {
return fmt.Errorf("cannot decode address: %v", err) e := btcjson.Error{
Code: btcjson.ErrInvalidAddressOrKey.Code,
Message: fmt.Sprintf("Invalid address or key: %v", addrStr),
}
return nil, &e
} }
// TODO(jrick) Notifing for non-P2PKH addresses is currently // TODO(jrick) Notifing for non-P2PKH addresses is currently
// unsuported. // unsuported.
if _, ok := addr.(*btcutil.AddressPubKeyHash); !ok { if _, ok := addr.(*btcutil.AddressPubKeyHash); !ok {
return fmt.Errorf("address is not P2PKH: %v", addr.EncodeAddress()) e := btcjson.Error{
Code: btcjson.ErrInvalidAddressOrKey.Code,
Message: fmt.Sprintf("Invalid address or key: %v", addr.EncodeAddress()),
}
return nil, &e
} }
s.ws.AddTxRequest(wallet, addr.EncodeAddress()) s.ws.AddTxRequest(ntfns, addr.EncodeAddress())
} }
mreply, _ := json.Marshal(reply) return nil, nil
wallet <- mreply
return nil
} }
// handleNotifySpent implements the notifyspent command extension for // handleNotifySpent implements the notifyspent command extension for
// websocket connections. // websocket connections.
func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error { func handleNotifySpent(s *rpcServer, icmd btcjson.Cmd, ntfns ntfnChan) (interface{}, *btcjson.Error) {
id := cmd.Id() cmd, ok := icmd.(*btcws.NotifySpentCmd)
reply := &btcjson.Reply{Id: &id}
notifyCmd, ok := cmd.(*btcws.NotifySpentCmd)
if !ok { if !ok {
return btcjson.ErrInternal return nil, &btcjson.ErrInternal
} }
s.ws.AddSpentRequest(wallet, notifyCmd.OutPoint) s.ws.AddSpentRequest(ntfns, cmd.OutPoint)
mreply, _ := json.Marshal(reply) return nil, nil
wallet <- mreply
return nil
} }
// handleRescan implements the rescan command extension for websocket // handleRescan implements the rescan command extension for websocket
// connections. // connections.
func handleRescan(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error { func handleRescan(s *rpcServer, icmd btcjson.Cmd, ntfns ntfnChan) (interface{}, *btcjson.Error) {
rescanCmd, ok := cmd.(*btcws.RescanCmd) cmd, ok := icmd.(*btcws.RescanCmd)
if !ok { if !ok {
return btcjson.ErrInternal return nil, &btcjson.ErrInternal
} }
if len(rescanCmd.Addresses) == 1 { if len(cmd.Addresses) == 1 {
rpcsLog.Info("Beginning rescan for 1 address.") rpcsLog.Info("Beginning rescan for 1 address.")
} else { } else {
rpcsLog.Infof("Beginning rescan for %v addresses.", rpcsLog.Infof("Beginning rescan for %v addresses.",
len(rescanCmd.Addresses)) len(cmd.Addresses))
} }
minblock := int64(rescanCmd.BeginBlock) minblock := int64(cmd.BeginBlock)
maxblock := int64(rescanCmd.EndBlock) maxblock := int64(cmd.EndBlock)
// FetchHeightRange may not return a complete list of block shas for // FetchHeightRange may not return a complete list of block shas for
// the given range, so fetch range as many times as necessary. // the given range, so fetch range as many times as necessary.
for { for {
blkshalist, err := s.server.db.FetchHeightRange(minblock, blkshalist, err := s.server.db.FetchHeightRange(minblock, maxblock)
maxblock)
if err != nil { if err != nil {
return err rpcsLog.Errorf("Error looking up block range: %v", err)
return nil, &btcjson.ErrDatabase
} }
if len(blkshalist) == 0 { if len(blkshalist) == 0 {
break break
@ -386,9 +354,8 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
for i := range blkshalist { for i := range blkshalist {
blk, err := s.server.db.FetchBlockBySha(&blkshalist[i]) blk, err := s.server.db.FetchBlockBySha(&blkshalist[i])
if err != nil { if err != nil {
rpcsLog.Errorf("Error looking up block sha: %v", rpcsLog.Errorf("Error looking up block sha: %v", err)
err) return nil, &btcjson.ErrDatabase
return err
} }
for _, tx := range blk.Transactions() { for _, tx := range blk.Transactions() {
var txReply *btcdb.TxListReply var txReply *btcdb.TxListReply
@ -402,7 +369,7 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
for i, addr := range addrs { for i, addr := range addrs {
encodedAddr := addr.EncodeAddress() encodedAddr := addr.EncodeAddress()
if _, ok := rescanCmd.Addresses[encodedAddr]; ok { if _, ok := cmd.Addresses[encodedAddr]; ok {
// TODO(jrick): This lookup is expensive and can be avoided // TODO(jrick): This lookup is expensive and can be avoided
// if the wallet is sent the previous outpoints for all inputs // if the wallet is sent the previous outpoints for all inputs
// of the tx, so any can removed from the utxo set (since // of the tx, so any can removed from the utxo set (since
@ -422,7 +389,7 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
} }
ntfn := &btcws.ProcessedTxNtfn{ n := &btcws.ProcessedTxNtfn{
Receiver: encodedAddr, Receiver: encodedAddr,
Amount: txout.Value, Amount: txout.Value,
TxID: tx.Sha().String(), TxID: tx.Sha().String(),
@ -434,8 +401,7 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
BlockTime: blk.MsgBlock().Header.Timestamp.Unix(), BlockTime: blk.MsgBlock().Header.Timestamp.Unix(),
Spent: txReply.TxSpent[txOutIdx], Spent: txReply.TxSpent[txOutIdx],
} }
mntfn, _ := ntfn.MarshalJSON() ntfns <- n
wallet <- mntfn
} }
} }
} }
@ -450,25 +416,26 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
} }
rpcsLog.Info("Finished rescan") rpcsLog.Info("Finished rescan")
return nil, nil
id := cmd.Id()
response := &btcjson.Reply{
Id: &id,
Result: nil,
Error: nil,
}
mresponse, _ := json.Marshal(response)
wallet <- mresponse
return nil
} }
// handleWalletSendRawTransaction implements the websocket extended version of // handleWalletSendRawTransaction implements the websocket extended version of
// the sendrawtransaction command. // the sendrawtransaction command.
func handleWalletSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error { func handleWalletSendRawTransaction(s *rpcServer, icmd btcjson.Cmd, n ntfnChan) (interface{}, *btcjson.Error) {
result, err := handleSendRawTransaction(s, cmd) result, err := handleSendRawTransaction(s, icmd)
// TODO: the standard handlers really should be changed to
// return btcjson.Errors which get used directly in the
// response. Wouldn't need this crap here then.
var jsonErr *btcjson.Error
if jsonErr, ok := err.(*btcjson.Error); ok {
return result, jsonErr
}
jsonErr = &btcjson.Error{
Code: btcjson.ErrMisc.Code,
Message: err.Error(),
}
if err != nil { if err != nil {
return err return result, jsonErr
} }
// The result is already guaranteed to be a valid hash string if no // The result is already guaranteed to be a valid hash string if no
@ -476,39 +443,39 @@ func handleWalletSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, wallet wallet
txSha, _ := btcwire.NewShaHashFromStr(result.(string)) txSha, _ := btcwire.NewShaHashFromStr(result.(string))
// Request to be notified when the transaction is mined. // Request to be notified when the transaction is mined.
s.ws.AddMinedTxRequest(wallet, txSha) s.ws.AddMinedTxRequest(n, txSha)
return nil return result, nil
} }
// AddWalletListener adds a channel to listen for new messages from a // AddWalletListener adds a channel to listen for new messages from a
// wallet. // wallet.
func (s *rpcServer) AddWalletListener(c walletChan) { func (s *rpcServer) AddWalletListener(n ntfnChan) {
s.ws.Lock() s.ws.Lock()
rc := &requestContexts{ rc := &requestContexts{
txRequests: make(map[string]struct{}), txRequests: make(map[string]struct{}),
spentRequests: make(map[btcwire.OutPoint]struct{}), spentRequests: make(map[btcwire.OutPoint]struct{}),
minedTxRequests: make(map[btcwire.ShaHash]struct{}), minedTxRequests: make(map[btcwire.ShaHash]struct{}),
} }
s.ws.connections[c] = rc s.ws.connections[n] = rc
s.ws.Unlock() s.ws.Unlock()
} }
// RemoveWalletListener removes a wallet listener channel. // RemoveWalletListener removes a wallet listener channel.
func (s *rpcServer) RemoveWalletListener(c walletChan) { func (s *rpcServer) RemoveWalletListener(n ntfnChan) {
s.ws.Lock() s.ws.Lock()
rc := s.ws.connections[c] rc := s.ws.connections[n]
for k := range rc.txRequests { for k := range rc.txRequests {
s.ws.removeGlobalTxRequest(c, k) s.ws.removeGlobalTxRequest(n, k)
} }
for k := range rc.spentRequests { for k := range rc.spentRequests {
s.ws.removeGlobalSpentRequest(c, &k) s.ws.removeGlobalSpentRequest(n, &k)
} }
for k := range rc.minedTxRequests { for k := range rc.minedTxRequests {
s.ws.removeGlobalMinedTxRequest(c, &k) s.ws.removeGlobalMinedTxRequest(n, &k)
} }
delete(s.ws.connections, c) delete(s.ws.connections, n)
s.ws.Unlock() s.ws.Unlock()
} }
@ -539,9 +506,12 @@ func (s *rpcServer) walletListenerDuplicator() {
func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) { func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// Add wallet notification channel so this handler receives btcd chain // Add wallet notification channel so this handler receives btcd chain
// notifications. // notifications.
c := make(walletChan) n := make(ntfnChan)
s.AddWalletListener(c) s.AddWalletListener(n)
defer s.RemoveWalletListener(c) defer s.RemoveWalletListener(n)
// Channel for responses.
r := make(chan *btcjson.Reply)
// msgs is a channel for all messages received over the websocket. // msgs is a channel for all messages received over the websocket.
msgs := make(chan []byte) msgs := make(chan []byte)
@ -554,6 +524,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
case <-s.quit: case <-s.quit:
close(msgs) close(msgs)
return return
default: default:
var m []byte var m []byte
if err := websocket.Message.Receive(ws, &m); err != nil { if err := websocket.Message.Receive(ws, &m); err != nil {
@ -573,14 +544,32 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
return return
} }
// Handle request here. // Handle request here.
go s.websocketJSONHandler(c, m) go s.websocketJSONHandler(r, n, m)
case ntfn, _ := <-c:
// Send btcd notification to btcwallet instance over case response := <-r:
// websocket. // Marshal and send response.
if err := websocket.Message.Send(ws, ntfn); err != nil { mresp, err := json.Marshal(response)
if err != nil {
rpcsLog.Errorf("Error unmarshaling response: %v", err)
continue
}
if err := websocket.Message.Send(ws, mresp); err != nil {
// Wallet disconnected. // Wallet disconnected.
return return
} }
case ntfn := <-n:
// Marshal and send notification.
mntfn, err := ntfn.MarshalJSON()
if err != nil {
rpcsLog.Errorf("Error unmarshaling notification: %v", err)
continue
}
if err := websocket.Message.Send(ws, mntfn); err != nil {
// Wallet disconnected.
return
}
case <-s.quit: case <-s.quit:
// Server closed. // Server closed.
return return
@ -590,26 +579,30 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// websocketJSONHandler parses and handles a marshalled json message, // websocketJSONHandler parses and handles a marshalled json message,
// sending the marshalled reply to a wallet notification channel. // sending the marshalled reply to a wallet notification channel.
func (s *rpcServer) websocketJSONHandler(wallet walletChan, msg []byte) { func (s *rpcServer) websocketJSONHandler(r chan *btcjson.Reply, n ntfnChan, msg []byte) {
s.wg.Add(1) s.wg.Add(1)
defer s.wg.Done() defer s.wg.Done()
cmd, jsonErr := parseCmd(msg) cmd, jsonErr := parseCmd(msg)
if jsonErr != nil { if jsonErr != nil {
var reply btcjson.Reply var resp btcjson.Reply
if cmd != nil { if cmd != nil {
// Unmarshaling at least a valid JSON-RPC message succeeded. // Unmarshaling at least a valid JSON-RPC message succeeded.
// Use the provided id for errors. // Use the provided id for errors. Requests with no IDs
// should be ignored.
id := cmd.Id() id := cmd.Id()
reply.Id = &id if id == nil {
return
}
resp.Id = &id
} }
reply.Error = jsonErr resp.Error = jsonErr
mreply, _ := json.Marshal(reply) r <- &resp
wallet <- mreply
return return
} }
respondToAnyCmd(cmd, s, wallet) resp := respondToAnyCmd(cmd, s, n)
r <- resp
} }
// NotifyBlockConnected creates and marshalls a JSON message to notify // NotifyBlockConnected creates and marshalls a JSON message to notify
@ -623,10 +616,8 @@ func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
} }
// TODO: remove int32 type conversion. // TODO: remove int32 type conversion.
ntfn := btcws.NewBlockConnectedNtfn(hash.String(), ntfn := btcws.NewBlockConnectedNtfn(hash.String(), int32(block.Height()))
int32(block.Height())) s.ws.walletNotificationMaster <- ntfn
mntfn, _ := json.Marshal(ntfn)
s.ws.walletNotificationMaster <- mntfn
// Inform any interested parties about txs mined in this block. // Inform any interested parties about txs mined in this block.
s.ws.Lock() s.ws.Lock()
@ -635,16 +626,15 @@ func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
var enext *list.Element var enext *list.Element
for e := clist.Front(); e != nil; e = enext { for e := clist.Front(); e != nil; e = enext {
enext = e.Next() enext = e.Next()
c := e.Value.(walletChan) n := e.Value.(ntfnChan)
// TODO: remove int32 type conversion after // TODO: remove int32 type conversion after
// the int64 -> int32 switch is made. // the int64 -> int32 switch is made.
ntfn := btcws.NewTxMinedNtfn(tx.Sha().String(), ntfn := btcws.NewTxMinedNtfn(tx.Sha().String(),
hash.String(), int32(block.Height()), hash.String(), int32(block.Height()),
block.MsgBlock().Header.Timestamp.Unix(), block.MsgBlock().Header.Timestamp.Unix(),
tx.Index()) tx.Index())
mntfn, _ := json.Marshal(ntfn) n <- ntfn
c <- mntfn s.ws.removeMinedTxRequest(n, tx.Sha())
s.ws.removeMinedTxRequest(c, tx.Sha())
} }
} }
} }
@ -664,8 +654,7 @@ func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) {
// TODO: remove int32 type conversion. // TODO: remove int32 type conversion.
ntfn := btcws.NewBlockDisconnectedNtfn(hash.String(), ntfn := btcws.NewBlockDisconnectedNtfn(hash.String(),
int32(block.Height())) int32(block.Height()))
mntfn, _ := json.Marshal(ntfn) s.ws.walletNotificationMaster <- ntfn
s.ws.walletNotificationMaster <- mntfn
} }
// NotifyBlockTXs creates and marshals a JSON message to notify wallets // NotifyBlockTXs creates and marshals a JSON message to notify wallets
@ -678,7 +667,7 @@ func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) {
} }
} }
func notifySpentData(wallet walletChan, txhash *btcwire.ShaHash, index uint32, func notifySpentData(n ntfnChan, txhash *btcwire.ShaHash, index uint32,
spender *btcutil.Tx) { spender *btcutil.Tx) {
var buf bytes.Buffer var buf bytes.Buffer
@ -687,10 +676,8 @@ func notifySpentData(wallet walletChan, txhash *btcwire.ShaHash, index uint32,
spender.MsgTx().Serialize(&buf) spender.MsgTx().Serialize(&buf)
txStr := hex.EncodeToString(buf.Bytes()) txStr := hex.EncodeToString(buf.Bytes())
// TODO(jrick): create a new notification in btcws and use that.
ntfn := btcws.NewTxSpentNtfn(txhash.String(), int(index), txStr) ntfn := btcws.NewTxSpentNtfn(txhash.String(), int(index), txStr)
mntfn, _ := ntfn.MarshalJSON() n <- ntfn
wallet <- mntfn
} }
// newBlockNotifyCheckTxIn is a helper function to iterate through // newBlockNotifyCheckTxIn is a helper function to iterate through
@ -702,10 +689,10 @@ func (s *rpcServer) newBlockNotifyCheckTxIn(tx *btcutil.Tx) {
var enext *list.Element var enext *list.Element
for e := clist.Front(); e != nil; e = enext { for e := clist.Front(); e != nil; e = enext {
enext = e.Next() enext = e.Next()
c := e.Value.(walletChan) n := e.Value.(ntfnChan)
notifySpentData(c, &txin.PreviousOutpoint.Hash, notifySpentData(n, &txin.PreviousOutpoint.Hash,
txin.PreviousOutpoint.Index, tx) txin.PreviousOutpoint.Index, tx)
s.ws.RemoveSpentRequest(c, &txin.PreviousOutpoint) s.ws.RemoveSpentRequest(n, &txin.PreviousOutpoint)
} }
} }
} }
@ -731,7 +718,7 @@ func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
encodedAddr := addr.EncodeAddress() encodedAddr := addr.EncodeAddress()
if idlist, ok := s.ws.txNotifications[encodedAddr]; ok { if idlist, ok := s.ws.txNotifications[encodedAddr]; ok {
for e := idlist.Front(); e != nil; e = e.Next() { for e := idlist.Front(); e != nil; e = e.Next() {
wallet := e.Value.(walletChan) n := e.Value.(ntfnChan)
ntfn := &btcws.ProcessedTxNtfn{ ntfn := &btcws.ProcessedTxNtfn{
Receiver: encodedAddr, Receiver: encodedAddr,
@ -759,8 +746,7 @@ func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
ntfn.BlockIndex = -1 ntfn.BlockIndex = -1
} }
mntfn, _ := ntfn.MarshalJSON() n <- ntfn
wallet <- mntfn
} }
} }
} }