lbcwallet/rpc/legacyrpc/server.go
Josh Rickmar b480a0a09d Set account field in listtransactions result.
This field is only set for non-"send" categories since the wallet does
not track a "from account" like Core's wallet does.

Fixes #353.
2016-02-06 14:59:32 -05:00

971 lines
27 KiB
Go

/*
* Copyright (c) 2013-2015 The btcsuite developers
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
package legacyrpc
import (
"crypto/subtle"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/btcsuite/fastsha256"
"github.com/btcsuite/websocket"
)
type websocketClient struct {
conn *websocket.Conn
authenticated bool
remoteAddr string
allRequests chan []byte
responses chan []byte
quit chan struct{} // closed on disconnect
wg sync.WaitGroup
}
func newWebsocketClient(c *websocket.Conn, authenticated bool, remoteAddr string) *websocketClient {
return &websocketClient{
conn: c,
authenticated: authenticated,
remoteAddr: remoteAddr,
allRequests: make(chan []byte),
responses: make(chan []byte),
quit: make(chan struct{}),
}
}
func (c *websocketClient) send(b []byte) error {
select {
case c.responses <- b:
return nil
case <-c.quit:
return errors.New("websocket client disconnected")
}
}
// Server holds the items the RPC server may need to access (auth,
// config, shutdown, etc.)
type Server struct {
httpServer http.Server
wallet *wallet.Wallet
walletLoader *wallet.Loader
chainClient *chain.RPCClient
handlerLookup func(string) (requestHandler, bool)
handlerMu sync.Mutex
listeners []net.Listener
authsha [fastsha256.Size]byte
upgrader websocket.Upgrader
maxPostClients int64 // Max concurrent HTTP POST clients.
maxWebsocketClients int64 // Max concurrent websocket clients.
// Channels to register or unregister a websocket client for
// websocket notifications.
registerWSC chan *websocketClient
unregisterWSC chan *websocketClient
// Channels read from other components from which notifications are
// created.
connectedBlocks <-chan wtxmgr.BlockMeta
disconnectedBlocks <-chan wtxmgr.BlockMeta
relevantTxs <-chan chain.RelevantTx
managerLocked <-chan bool
confirmedBalance <-chan btcutil.Amount
unconfirmedBalance <-chan btcutil.Amount
//chainServerConnected <-chan bool
registerWalletNtfns chan struct{}
// enqueueNotification and dequeueNotification handle both sides of an
// infinitly growing queue for websocket client notifications.
enqueueNotification chan wsClientNotification
dequeueNotification chan wsClientNotification
// notificationHandlerQuit is closed when the notification handler
// goroutine shuts down. After this is closed, no more notifications
// will be sent to any websocket client response channel.
notificationHandlerQuit chan struct{}
wg sync.WaitGroup
quit chan struct{}
quitMtx sync.Mutex
requestShutdownChan chan struct{}
}
// jsonAuthFail sends a message back to the client if the http auth is rejected.
func jsonAuthFail(w http.ResponseWriter) {
w.Header().Add("WWW-Authenticate", `Basic realm="btcwallet RPC"`)
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
}
// NewServer creates a new server for serving legacy RPC client connections,
// both HTTP POST and websocket.
func NewServer(opts *Options, walletLoader *wallet.Loader, listeners []net.Listener) *Server {
serveMux := http.NewServeMux()
const rpcAuthTimeoutSeconds = 10
server := &Server{
httpServer: http.Server{
Handler: serveMux,
// Timeout connections which don't complete the initial
// handshake within the allowed timeframe.
ReadTimeout: time.Second * rpcAuthTimeoutSeconds,
},
walletLoader: walletLoader,
maxPostClients: opts.MaxPOSTClients,
maxWebsocketClients: opts.MaxWebsocketClients,
listeners: listeners,
// A hash of the HTTP basic auth string is used for a constant
// time comparison.
authsha: fastsha256.Sum256(httpBasicAuth(opts.Username, opts.Password)),
upgrader: websocket.Upgrader{
// Allow all origins.
CheckOrigin: func(r *http.Request) bool { return true },
},
registerWSC: make(chan *websocketClient),
unregisterWSC: make(chan *websocketClient),
registerWalletNtfns: make(chan struct{}),
enqueueNotification: make(chan wsClientNotification),
dequeueNotification: make(chan wsClientNotification),
notificationHandlerQuit: make(chan struct{}),
quit: make(chan struct{}),
requestShutdownChan: make(chan struct{}, 1),
}
serveMux.Handle("/", throttledFn(opts.MaxPOSTClients,
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "close")
w.Header().Set("Content-Type", "application/json")
r.Close = true
if err := server.checkAuthHeader(r); err != nil {
log.Warnf("Unauthorized client connection attempt")
jsonAuthFail(w)
return
}
server.wg.Add(1)
server.PostClientRPC(w, r)
server.wg.Done()
}))
serveMux.Handle("/ws", throttledFn(opts.MaxWebsocketClients,
func(w http.ResponseWriter, r *http.Request) {
authenticated := false
switch server.checkAuthHeader(r) {
case nil:
authenticated = true
case ErrNoAuth:
// nothing
default:
// If auth was supplied but incorrect, rather than simply
// being missing, immediately terminate the connection.
log.Warnf("Disconnecting improperly authorized " +
"websocket client")
jsonAuthFail(w)
return
}
conn, err := server.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warnf("Cannot websocket upgrade client %s: %v",
r.RemoteAddr, err)
return
}
wsc := newWebsocketClient(conn, authenticated, r.RemoteAddr)
server.websocketClientRPC(wsc)
}))
server.wg.Add(3)
go server.notificationListener()
go server.notificationQueue()
go server.notificationHandler()
for _, lis := range listeners {
server.serve(lis)
}
return server
}
// httpBasicAuth returns the UTF-8 bytes of the HTTP Basic authentication
// string:
//
// "Basic " + base64(username + ":" + password)
func httpBasicAuth(username, password string) []byte {
const header = "Basic "
base64 := base64.StdEncoding
b64InputLen := len(username) + len(":") + len(password)
b64Input := make([]byte, 0, b64InputLen)
b64Input = append(b64Input, username...)
b64Input = append(b64Input, ':')
b64Input = append(b64Input, password...)
output := make([]byte, len(header)+base64.EncodedLen(b64InputLen))
copy(output, header)
base64.Encode(output[len(header):], b64Input)
return output
}
// serve serves HTTP POST and websocket RPC for the legacy JSON-RPC RPC server.
// This function does not block on lis.Accept.
func (s *Server) serve(lis net.Listener) {
s.wg.Add(1)
go func() {
log.Infof("Listening on %s", lis.Addr())
err := s.httpServer.Serve(lis)
log.Tracef("Finished serving RPC: %v", err)
s.wg.Done()
}()
}
// RegisterWallet associates the legacy RPC server with the wallet. This
// function must be called before any wallet RPCs can be called by clients.
func (s *Server) RegisterWallet(w *wallet.Wallet) {
s.handlerMu.Lock()
s.wallet = w
s.registerWalletNtfns <- struct{}{}
s.handlerMu.Unlock()
}
// Stop gracefully shuts down the rpc server by stopping and disconnecting all
// clients, disconnecting the chain server connection, and closing the wallet's
// account files. This blocks until shutdown completes.
func (s *Server) Stop() {
s.quitMtx.Lock()
select {
case <-s.quit:
s.quitMtx.Unlock()
return
default:
}
// Stop the connected wallet and chain server, if any.
s.handlerMu.Lock()
wallet := s.wallet
chainClient := s.chainClient
s.handlerMu.Unlock()
if wallet != nil {
wallet.Stop()
}
if chainClient != nil {
chainClient.Stop()
}
// Stop all the listeners.
for _, listener := range s.listeners {
err := listener.Close()
if err != nil {
log.Errorf("Cannot close listener `%s`: %v",
listener.Addr(), err)
}
}
// Signal the remaining goroutines to stop.
close(s.quit)
s.quitMtx.Unlock()
// First wait for the wallet and chain server to stop, if they
// were ever set.
if wallet != nil {
wallet.WaitForShutdown()
}
if chainClient != nil {
chainClient.WaitForShutdown()
}
// Wait for all remaining goroutines to exit.
s.wg.Wait()
}
// SetChainServer sets the chain server client component needed to run a fully
// functional bitcoin wallet RPC server. This can be called to enable RPC
// passthrough even before a loaded wallet is set, but the wallet's RPC client
// is preferred.
func (s *Server) SetChainServer(chainClient *chain.RPCClient) {
s.handlerMu.Lock()
s.chainClient = chainClient
s.handlerMu.Unlock()
}
// handlerClosure creates a closure function for handling requests of the given
// method. This may be a request that is handled directly by btcwallet, or
// a chain server request that is handled by passing the request down to btcd.
//
// NOTE: These handlers do not handle special cases, such as the authenticate
// method. Each of these must be checked beforehand (the method is already
// known) and handled accordingly.
func (s *Server) handlerClosure(request *btcjson.Request) lazyHandler {
s.handlerMu.Lock()
// With the lock held, make copies of these pointers for the closure.
wallet := s.wallet
chainClient := s.chainClient
if wallet != nil && chainClient == nil {
chainClient = wallet.ChainClient()
s.chainClient = chainClient
}
s.handlerMu.Unlock()
return lazyApplyHandler(request, wallet, chainClient)
}
// ErrNoAuth represents an error where authentication could not succeed
// due to a missing Authorization HTTP header.
var ErrNoAuth = errors.New("no auth")
// checkAuthHeader checks the HTTP Basic authentication supplied by a client
// in the HTTP request r. It errors with ErrNoAuth if the request does not
// contain the Authorization header, or another non-nil error if the
// authentication was provided but incorrect.
//
// This check is time-constant.
func (s *Server) checkAuthHeader(r *http.Request) error {
authhdr := r.Header["Authorization"]
if len(authhdr) == 0 {
return ErrNoAuth
}
authsha := fastsha256.Sum256([]byte(authhdr[0]))
cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:])
if cmp != 1 {
return errors.New("bad auth")
}
return nil
}
// throttledFn wraps an http.HandlerFunc with throttling of concurrent active
// clients by responding with an HTTP 429 when the threshold is crossed.
func throttledFn(threshold int64, f http.HandlerFunc) http.Handler {
return throttled(threshold, f)
}
// throttled wraps an http.Handler with throttling of concurrent active
// clients by responding with an HTTP 429 when the threshold is crossed.
func throttled(threshold int64, h http.Handler) http.Handler {
var active int64
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
current := atomic.AddInt64(&active, 1)
defer atomic.AddInt64(&active, -1)
if current-1 >= threshold {
log.Warnf("Reached threshold of %d concurrent active clients", threshold)
http.Error(w, "429 Too Many Requests", 429)
return
}
h.ServeHTTP(w, r)
})
}
// sanitizeRequest returns a sanitized string for the request which may be
// safely logged. It is intended to strip private keys, passphrases, and any
// other secrets from request parameters before they may be saved to a log file.
func sanitizeRequest(r *btcjson.Request) string {
// These are considered unsafe to log, so sanitize parameters.
switch r.Method {
case "encryptwallet", "importprivkey", "importwallet",
"signrawtransaction", "walletpassphrase",
"walletpassphrasechange":
return fmt.Sprintf(`{"id":%v,"method":"%s","params":SANITIZED %d parameters}`,
r.ID, r.Method, len(r.Params))
}
return fmt.Sprintf(`{"id":%v,"method":"%s","params":%v}`, r.ID,
r.Method, r.Params)
}
// idPointer returns a pointer to the passed ID, or nil if the interface is nil.
// Interface pointers are usually a red flag of doing something incorrectly,
// but this is only implemented here to work around an oddity with btcjson,
// which uses empty interface pointers for response IDs.
func idPointer(id interface{}) (p *interface{}) {
if id != nil {
p = &id
}
return
}
// invalidAuth checks whether a websocket request is a valid (parsable)
// authenticate request and checks the supplied username and passphrase
// against the server auth.
func (s *Server) invalidAuth(req *btcjson.Request) bool {
cmd, err := btcjson.UnmarshalCmd(req)
if err != nil {
return false
}
authCmd, ok := cmd.(*btcjson.AuthenticateCmd)
if !ok {
return false
}
// Check credentials.
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := fastsha256.Sum256([]byte(auth))
return subtle.ConstantTimeCompare(authSha[:], s.authsha[:]) != 1
}
func (s *Server) websocketClientRead(wsc *websocketClient) {
for {
_, request, err := wsc.conn.ReadMessage()
if err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Warnf("Websocket receive failed from client %s: %v",
wsc.remoteAddr, err)
}
close(wsc.allRequests)
break
}
wsc.allRequests <- request
}
}
func (s *Server) websocketClientRespond(wsc *websocketClient) {
// A for-select with a read of the quit channel is used instead of a
// for-range to provide clean shutdown. This is necessary due to
// WebsocketClientRead (which sends to the allRequests chan) not closing
// allRequests during shutdown if the remote websocket client is still
// connected.
out:
for {
select {
case reqBytes, ok := <-wsc.allRequests:
if !ok {
// client disconnected
break out
}
var req btcjson.Request
err := json.Unmarshal(reqBytes, &req)
if err != nil {
if !wsc.authenticated {
// Disconnect immediately.
break out
}
resp := makeResponse(req.ID, nil,
btcjson.ErrRPCInvalidRequest)
mresp, err := json.Marshal(resp)
// We expect the marshal to succeed. If it
// doesn't, it indicates some non-marshalable
// type in the response.
if err != nil {
panic(err)
}
err = wsc.send(mresp)
if err != nil {
break out
}
continue
}
if req.Method == "authenticate" {
if wsc.authenticated || s.invalidAuth(&req) {
// Disconnect immediately.
break out
}
wsc.authenticated = true
resp := makeResponse(req.ID, nil, nil)
// Expected to never fail.
mresp, err := json.Marshal(resp)
if err != nil {
panic(err)
}
err = wsc.send(mresp)
if err != nil {
break out
}
continue
}
if !wsc.authenticated {
// Disconnect immediately.
break out
}
switch req.Method {
case "stop":
resp := makeResponse(req.ID,
"btcwallet stopping.", nil)
mresp, err := json.Marshal(resp)
// Expected to never fail.
if err != nil {
panic(err)
}
err = wsc.send(mresp)
if err != nil {
break out
}
s.requestProcessShutdown()
break
default:
req := req // Copy for the closure
f := s.handlerClosure(&req)
wsc.wg.Add(1)
go func() {
resp, jsonErr := f()
mresp, err := btcjson.MarshalResponse(req.ID, resp, jsonErr)
if err != nil {
log.Errorf("Unable to marshal response: %v", err)
} else {
_ = wsc.send(mresp)
}
wsc.wg.Done()
}()
}
case <-s.quit:
break out
}
}
// Remove websocket client from notification group, or if the server is
// shutting down, wait until the notification handler has finished
// running. This is needed to ensure that no more notifications will be
// sent to the client's responses chan before it's closed below.
select {
case s.unregisterWSC <- wsc:
case <-s.quit:
<-s.notificationHandlerQuit
}
// allow client to disconnect after all handler goroutines are done
wsc.wg.Wait()
close(wsc.responses)
s.wg.Done()
}
func (s *Server) websocketClientSend(wsc *websocketClient) {
const deadline time.Duration = 2 * time.Second
out:
for {
select {
case response, ok := <-wsc.responses:
if !ok {
// client disconnected
break out
}
err := wsc.conn.SetWriteDeadline(time.Now().Add(deadline))
if err != nil {
log.Warnf("Cannot set write deadline on "+
"client %s: %v", wsc.remoteAddr, err)
}
err = wsc.conn.WriteMessage(websocket.TextMessage,
response)
if err != nil {
log.Warnf("Failed websocket send to client "+
"%s: %v", wsc.remoteAddr, err)
break out
}
case <-s.quit:
break out
}
}
close(wsc.quit)
log.Infof("Disconnected websocket client %s", wsc.remoteAddr)
s.wg.Done()
}
// websocketClientRPC starts the goroutines to serve JSON-RPC requests and
// notifications over a websocket connection for a single client.
func (s *Server) websocketClientRPC(wsc *websocketClient) {
log.Infof("New websocket client %s", wsc.remoteAddr)
// Clear the read deadline set before the websocket hijacked
// the connection.
if err := wsc.conn.SetReadDeadline(time.Time{}); err != nil {
log.Warnf("Cannot remove read deadline: %v", err)
}
// Add client context so notifications duplicated to each
// client are received by this client.
select {
case s.registerWSC <- wsc:
case <-s.quit:
return
}
// WebsocketClientRead is intentionally not run with the waitgroup
// so it is ignored during shutdown. This is to prevent a hang during
// shutdown where the goroutine is blocked on a read of the
// websocket connection if the client is still connected.
go s.websocketClientRead(wsc)
s.wg.Add(2)
go s.websocketClientRespond(wsc)
go s.websocketClientSend(wsc)
<-wsc.quit
}
// maxRequestSize specifies the maximum number of bytes in the request body
// that may be read from a client. This is currently limited to 4MB.
const maxRequestSize = 1024 * 1024 * 4
// PostClientRPC processes and replies to a JSON-RPC client request.
func (s *Server) PostClientRPC(w http.ResponseWriter, r *http.Request) {
body := http.MaxBytesReader(w, r.Body, maxRequestSize)
rpcRequest, err := ioutil.ReadAll(body)
if err != nil {
// TODO: what if the underlying reader errored?
http.Error(w, "413 Request Too Large.",
http.StatusRequestEntityTooLarge)
return
}
// First check whether wallet has a handler for this request's method.
// If unfound, the request is sent to the chain server for further
// processing. While checking the methods, disallow authenticate
// requests, as they are invalid for HTTP POST clients.
var req btcjson.Request
err = json.Unmarshal(rpcRequest, &req)
if err != nil {
resp, err := btcjson.MarshalResponse(req.ID, nil, btcjson.ErrRPCInvalidRequest)
if err != nil {
log.Errorf("Unable to marshal response: %v", err)
http.Error(w, "500 Internal Server Error",
http.StatusInternalServerError)
return
}
_, err = w.Write(resp)
if err != nil {
log.Warnf("Cannot write invalid request request to "+
"client: %v", err)
}
return
}
// Create the response and error from the request. Two special cases
// are handled for the authenticate and stop request methods.
var res interface{}
var jsonErr *btcjson.RPCError
var stop bool
switch req.Method {
case "authenticate":
// Drop it.
return
case "stop":
stop = true
res = "btcwallet stopping"
default:
res, jsonErr = s.handlerClosure(&req)()
}
// Marshal and send.
mresp, err := btcjson.MarshalResponse(req.ID, res, jsonErr)
if err != nil {
log.Errorf("Unable to marshal response: %v", err)
http.Error(w, "500 Internal Server Error", http.StatusInternalServerError)
return
}
_, err = w.Write(mresp)
if err != nil {
log.Warnf("Unable to respond to client: %v", err)
}
if stop {
s.requestProcessShutdown()
}
}
func (s *Server) requestProcessShutdown() {
select {
case s.requestShutdownChan <- struct{}{}:
default:
}
}
// RequestProcessShutdown returns a channel that is sent to when an authorized
// client requests remote shutdown.
func (s *Server) RequestProcessShutdown() <-chan struct{} {
return s.requestShutdownChan
}
// Notification messages for websocket clients.
type (
wsClientNotification interface {
// This returns a slice only because some of these types result
// in multpile client notifications.
notificationCmds(w *wallet.Wallet) []interface{}
}
blockConnected wtxmgr.BlockMeta
blockDisconnected wtxmgr.BlockMeta
relevantTx chain.RelevantTx
managerLocked bool
confirmedBalance btcutil.Amount
unconfirmedBalance btcutil.Amount
btcdConnected bool
)
func (b blockConnected) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewBlockConnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix())
return []interface{}{n}
}
func (b blockDisconnected) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewBlockDisconnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix())
return []interface{}{n}
}
func (t relevantTx) notificationCmds(w *wallet.Wallet) []interface{} {
syncBlock := w.Manager.SyncedTo()
var block *wtxmgr.Block
if t.Block != nil {
block = &t.Block.Block
}
details, err := w.TxStore.UniqueTxDetails(&t.TxRecord.Hash, block)
if err != nil {
log.Errorf("Cannot fetch transaction details for "+
"client notification: %v", err)
return nil
}
if details == nil {
log.Errorf("No details found for client transaction notification")
return nil
}
ltr := wallet.ListTransactions(details, w.Manager, syncBlock.Height,
w.ChainParams())
ntfns := make([]interface{}, len(ltr))
for i := range ntfns {
ntfns[i] = btcjson.NewNewTxNtfn(ltr[i].Account, ltr[i])
}
return ntfns
}
func (l managerLocked) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewWalletLockStateNtfn(bool(l))
return []interface{}{n}
}
func (b confirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewAccountBalanceNtfn("",
btcutil.Amount(b).ToBTC(), true)
return []interface{}{n}
}
func (b unconfirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewAccountBalanceNtfn("",
btcutil.Amount(b).ToBTC(), false)
return []interface{}{n}
}
func (b btcdConnected) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewBtcdConnectedNtfn(bool(b))
return []interface{}{n}
}
func (s *Server) notificationListener() {
out:
for {
select {
case n := <-s.connectedBlocks:
s.enqueueNotification <- blockConnected(n)
case n := <-s.disconnectedBlocks:
s.enqueueNotification <- blockDisconnected(n)
case n := <-s.relevantTxs:
s.enqueueNotification <- relevantTx(n)
case n := <-s.managerLocked:
s.enqueueNotification <- managerLocked(n)
case n := <-s.confirmedBalance:
s.enqueueNotification <- confirmedBalance(n)
case n := <-s.unconfirmedBalance:
s.enqueueNotification <- unconfirmedBalance(n)
// Registration of all notifications is done by the handler so
// it doesn't require another Server mutex.
case <-s.registerWalletNtfns:
connectedBlocks, err := s.wallet.ListenConnectedBlocks()
if err != nil {
log.Errorf("Could not register for new "+
"connected block notifications: %v",
err)
continue
}
disconnectedBlocks, err := s.wallet.ListenDisconnectedBlocks()
if err != nil {
log.Errorf("Could not register for new "+
"disconnected block notifications: %v",
err)
continue
}
relevantTxs, err := s.wallet.ListenRelevantTxs()
if err != nil {
log.Errorf("Could not register for new relevant "+
"transaction notifications: %v", err)
continue
}
managerLocked, err := s.wallet.ListenLockStatus()
if err != nil {
log.Errorf("Could not register for manager "+
"lock state changes: %v", err)
continue
}
confirmedBalance, err := s.wallet.ListenConfirmedBalance()
if err != nil {
log.Errorf("Could not register for confirmed "+
"balance changes: %v", err)
continue
}
unconfirmedBalance, err := s.wallet.ListenUnconfirmedBalance()
if err != nil {
log.Errorf("Could not register for unconfirmed "+
"balance changes: %v", err)
continue
}
s.connectedBlocks = connectedBlocks
s.disconnectedBlocks = disconnectedBlocks
s.relevantTxs = relevantTxs
s.managerLocked = managerLocked
s.confirmedBalance = confirmedBalance
s.unconfirmedBalance = unconfirmedBalance
case <-s.quit:
break out
}
}
close(s.enqueueNotification)
go s.drainNotifications()
s.wg.Done()
}
func (s *Server) drainNotifications() {
for {
select {
case <-s.connectedBlocks:
case <-s.disconnectedBlocks:
case <-s.relevantTxs:
case <-s.managerLocked:
case <-s.confirmedBalance:
case <-s.unconfirmedBalance:
case <-s.registerWalletNtfns:
}
}
}
// notificationQueue manages an infinitly-growing queue of notifications that
// wallet websocket clients may be interested in. It quits when the
// enqueueNotification channel is closed, dropping any still pending
// notifications.
func (s *Server) notificationQueue() {
var q []wsClientNotification
var dequeue chan<- wsClientNotification
skipQueue := s.dequeueNotification
var next wsClientNotification
out:
for {
select {
case n, ok := <-s.enqueueNotification:
if !ok {
// Sender closed input channel.
break out
}
// Either send to out immediately if skipQueue is
// non-nil (queue is empty) and reader is ready,
// or append to the queue and send later.
select {
case skipQueue <- n:
default:
q = append(q, n)
dequeue = s.dequeueNotification
skipQueue = nil
next = q[0]
}
case dequeue <- next:
q[0] = nil // avoid leak
q = q[1:]
if len(q) == 0 {
dequeue = nil
skipQueue = s.dequeueNotification
} else {
next = q[0]
}
}
}
close(s.dequeueNotification)
s.wg.Done()
}
func (s *Server) notificationHandler() {
clients := make(map[chan struct{}]*websocketClient)
out:
for {
select {
case c := <-s.registerWSC:
clients[c.quit] = c
case c := <-s.unregisterWSC:
delete(clients, c.quit)
case nmsg, ok := <-s.dequeueNotification:
// No more notifications.
if !ok {
break out
}
// Ignore if there are no clients to receive the
// notification.
if len(clients) == 0 {
continue
}
ns := nmsg.notificationCmds(s.wallet)
for _, n := range ns {
mn, err := btcjson.MarshalCmd(nil, n)
// All notifications are expected to be
// marshalable.
if err != nil {
panic(err)
}
for _, c := range clients {
if err := c.send(mn); err != nil {
delete(clients, c.quit)
}
}
}
case <-s.quit:
break out
}
}
close(s.notificationHandlerQuit)
s.wg.Done()
}