959 lines
26 KiB
Go
959 lines
26 KiB
Go
// Copyright (c) 2013-2015 The btcsuite developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package legacyrpc
|
|
|
|
import (
|
|
"crypto/subtle"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcjson"
|
|
"github.com/btcsuite/btcutil"
|
|
"github.com/btcsuite/btcwallet/chain"
|
|
"github.com/btcsuite/btcwallet/wallet"
|
|
"github.com/btcsuite/btcwallet/wtxmgr"
|
|
"github.com/btcsuite/fastsha256"
|
|
"github.com/btcsuite/websocket"
|
|
)
|
|
|
|
type websocketClient struct {
|
|
conn *websocket.Conn
|
|
authenticated bool
|
|
remoteAddr string
|
|
allRequests chan []byte
|
|
responses chan []byte
|
|
quit chan struct{} // closed on disconnect
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func newWebsocketClient(c *websocket.Conn, authenticated bool, remoteAddr string) *websocketClient {
|
|
return &websocketClient{
|
|
conn: c,
|
|
authenticated: authenticated,
|
|
remoteAddr: remoteAddr,
|
|
allRequests: make(chan []byte),
|
|
responses: make(chan []byte),
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (c *websocketClient) send(b []byte) error {
|
|
select {
|
|
case c.responses <- b:
|
|
return nil
|
|
case <-c.quit:
|
|
return errors.New("websocket client disconnected")
|
|
}
|
|
}
|
|
|
|
// Server holds the items the RPC server may need to access (auth,
|
|
// config, shutdown, etc.)
|
|
type Server struct {
|
|
httpServer http.Server
|
|
wallet *wallet.Wallet
|
|
walletLoader *wallet.Loader
|
|
chainClient *chain.RPCClient
|
|
handlerLookup func(string) (requestHandler, bool)
|
|
handlerMu sync.Mutex
|
|
|
|
listeners []net.Listener
|
|
authsha [fastsha256.Size]byte
|
|
upgrader websocket.Upgrader
|
|
|
|
maxPostClients int64 // Max concurrent HTTP POST clients.
|
|
maxWebsocketClients int64 // Max concurrent websocket clients.
|
|
|
|
// Channels to register or unregister a websocket client for
|
|
// websocket notifications.
|
|
registerWSC chan *websocketClient
|
|
unregisterWSC chan *websocketClient
|
|
|
|
// Channels read from other components from which notifications are
|
|
// created.
|
|
connectedBlocks <-chan wtxmgr.BlockMeta
|
|
disconnectedBlocks <-chan wtxmgr.BlockMeta
|
|
relevantTxs <-chan chain.RelevantTx
|
|
managerLocked <-chan bool
|
|
confirmedBalance <-chan btcutil.Amount
|
|
unconfirmedBalance <-chan btcutil.Amount
|
|
//chainServerConnected <-chan bool
|
|
registerWalletNtfns chan struct{}
|
|
|
|
// enqueueNotification and dequeueNotification handle both sides of an
|
|
// infinitly growing queue for websocket client notifications.
|
|
enqueueNotification chan wsClientNotification
|
|
dequeueNotification chan wsClientNotification
|
|
|
|
// notificationHandlerQuit is closed when the notification handler
|
|
// goroutine shuts down. After this is closed, no more notifications
|
|
// will be sent to any websocket client response channel.
|
|
notificationHandlerQuit chan struct{}
|
|
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
quitMtx sync.Mutex
|
|
|
|
requestShutdownChan chan struct{}
|
|
}
|
|
|
|
// jsonAuthFail sends a message back to the client if the http auth is rejected.
|
|
func jsonAuthFail(w http.ResponseWriter) {
|
|
w.Header().Add("WWW-Authenticate", `Basic realm="btcwallet RPC"`)
|
|
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
|
|
}
|
|
|
|
// NewServer creates a new server for serving legacy RPC client connections,
|
|
// both HTTP POST and websocket.
|
|
func NewServer(opts *Options, walletLoader *wallet.Loader, listeners []net.Listener) *Server {
|
|
serveMux := http.NewServeMux()
|
|
const rpcAuthTimeoutSeconds = 10
|
|
|
|
server := &Server{
|
|
httpServer: http.Server{
|
|
Handler: serveMux,
|
|
|
|
// Timeout connections which don't complete the initial
|
|
// handshake within the allowed timeframe.
|
|
ReadTimeout: time.Second * rpcAuthTimeoutSeconds,
|
|
},
|
|
walletLoader: walletLoader,
|
|
maxPostClients: opts.MaxPOSTClients,
|
|
maxWebsocketClients: opts.MaxWebsocketClients,
|
|
listeners: listeners,
|
|
// A hash of the HTTP basic auth string is used for a constant
|
|
// time comparison.
|
|
authsha: fastsha256.Sum256(httpBasicAuth(opts.Username, opts.Password)),
|
|
upgrader: websocket.Upgrader{
|
|
// Allow all origins.
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
},
|
|
registerWSC: make(chan *websocketClient),
|
|
unregisterWSC: make(chan *websocketClient),
|
|
registerWalletNtfns: make(chan struct{}),
|
|
enqueueNotification: make(chan wsClientNotification),
|
|
dequeueNotification: make(chan wsClientNotification),
|
|
notificationHandlerQuit: make(chan struct{}),
|
|
quit: make(chan struct{}),
|
|
requestShutdownChan: make(chan struct{}, 1),
|
|
}
|
|
|
|
serveMux.Handle("/", throttledFn(opts.MaxPOSTClients,
|
|
func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Connection", "close")
|
|
w.Header().Set("Content-Type", "application/json")
|
|
r.Close = true
|
|
|
|
if err := server.checkAuthHeader(r); err != nil {
|
|
log.Warnf("Unauthorized client connection attempt")
|
|
jsonAuthFail(w)
|
|
return
|
|
}
|
|
server.wg.Add(1)
|
|
server.PostClientRPC(w, r)
|
|
server.wg.Done()
|
|
}))
|
|
|
|
serveMux.Handle("/ws", throttledFn(opts.MaxWebsocketClients,
|
|
func(w http.ResponseWriter, r *http.Request) {
|
|
authenticated := false
|
|
switch server.checkAuthHeader(r) {
|
|
case nil:
|
|
authenticated = true
|
|
case ErrNoAuth:
|
|
// nothing
|
|
default:
|
|
// If auth was supplied but incorrect, rather than simply
|
|
// being missing, immediately terminate the connection.
|
|
log.Warnf("Disconnecting improperly authorized " +
|
|
"websocket client")
|
|
jsonAuthFail(w)
|
|
return
|
|
}
|
|
|
|
conn, err := server.upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Warnf("Cannot websocket upgrade client %s: %v",
|
|
r.RemoteAddr, err)
|
|
return
|
|
}
|
|
wsc := newWebsocketClient(conn, authenticated, r.RemoteAddr)
|
|
server.websocketClientRPC(wsc)
|
|
}))
|
|
|
|
server.wg.Add(3)
|
|
go server.notificationListener()
|
|
go server.notificationQueue()
|
|
go server.notificationHandler()
|
|
|
|
for _, lis := range listeners {
|
|
server.serve(lis)
|
|
}
|
|
|
|
return server
|
|
}
|
|
|
|
// httpBasicAuth returns the UTF-8 bytes of the HTTP Basic authentication
|
|
// string:
|
|
//
|
|
// "Basic " + base64(username + ":" + password)
|
|
func httpBasicAuth(username, password string) []byte {
|
|
const header = "Basic "
|
|
base64 := base64.StdEncoding
|
|
|
|
b64InputLen := len(username) + len(":") + len(password)
|
|
b64Input := make([]byte, 0, b64InputLen)
|
|
b64Input = append(b64Input, username...)
|
|
b64Input = append(b64Input, ':')
|
|
b64Input = append(b64Input, password...)
|
|
|
|
output := make([]byte, len(header)+base64.EncodedLen(b64InputLen))
|
|
copy(output, header)
|
|
base64.Encode(output[len(header):], b64Input)
|
|
return output
|
|
}
|
|
|
|
// serve serves HTTP POST and websocket RPC for the legacy JSON-RPC RPC server.
|
|
// This function does not block on lis.Accept.
|
|
func (s *Server) serve(lis net.Listener) {
|
|
s.wg.Add(1)
|
|
go func() {
|
|
log.Infof("Listening on %s", lis.Addr())
|
|
err := s.httpServer.Serve(lis)
|
|
log.Tracef("Finished serving RPC: %v", err)
|
|
s.wg.Done()
|
|
}()
|
|
}
|
|
|
|
// RegisterWallet associates the legacy RPC server with the wallet. This
|
|
// function must be called before any wallet RPCs can be called by clients.
|
|
func (s *Server) RegisterWallet(w *wallet.Wallet) {
|
|
s.handlerMu.Lock()
|
|
s.wallet = w
|
|
s.registerWalletNtfns <- struct{}{}
|
|
s.handlerMu.Unlock()
|
|
}
|
|
|
|
// Stop gracefully shuts down the rpc server by stopping and disconnecting all
|
|
// clients, disconnecting the chain server connection, and closing the wallet's
|
|
// account files. This blocks until shutdown completes.
|
|
func (s *Server) Stop() {
|
|
s.quitMtx.Lock()
|
|
select {
|
|
case <-s.quit:
|
|
s.quitMtx.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Stop the connected wallet and chain server, if any.
|
|
s.handlerMu.Lock()
|
|
wallet := s.wallet
|
|
chainClient := s.chainClient
|
|
s.handlerMu.Unlock()
|
|
if wallet != nil {
|
|
wallet.Stop()
|
|
}
|
|
if chainClient != nil {
|
|
chainClient.Stop()
|
|
}
|
|
|
|
// Stop all the listeners.
|
|
for _, listener := range s.listeners {
|
|
err := listener.Close()
|
|
if err != nil {
|
|
log.Errorf("Cannot close listener `%s`: %v",
|
|
listener.Addr(), err)
|
|
}
|
|
}
|
|
|
|
// Signal the remaining goroutines to stop.
|
|
close(s.quit)
|
|
s.quitMtx.Unlock()
|
|
|
|
// First wait for the wallet and chain server to stop, if they
|
|
// were ever set.
|
|
if wallet != nil {
|
|
wallet.WaitForShutdown()
|
|
}
|
|
if chainClient != nil {
|
|
chainClient.WaitForShutdown()
|
|
}
|
|
|
|
// Wait for all remaining goroutines to exit.
|
|
s.wg.Wait()
|
|
}
|
|
|
|
// SetChainServer sets the chain server client component needed to run a fully
|
|
// functional bitcoin wallet RPC server. This can be called to enable RPC
|
|
// passthrough even before a loaded wallet is set, but the wallet's RPC client
|
|
// is preferred.
|
|
func (s *Server) SetChainServer(chainClient *chain.RPCClient) {
|
|
s.handlerMu.Lock()
|
|
s.chainClient = chainClient
|
|
s.handlerMu.Unlock()
|
|
}
|
|
|
|
// handlerClosure creates a closure function for handling requests of the given
|
|
// method. This may be a request that is handled directly by btcwallet, or
|
|
// a chain server request that is handled by passing the request down to btcd.
|
|
//
|
|
// NOTE: These handlers do not handle special cases, such as the authenticate
|
|
// method. Each of these must be checked beforehand (the method is already
|
|
// known) and handled accordingly.
|
|
func (s *Server) handlerClosure(request *btcjson.Request) lazyHandler {
|
|
s.handlerMu.Lock()
|
|
// With the lock held, make copies of these pointers for the closure.
|
|
wallet := s.wallet
|
|
chainClient := s.chainClient
|
|
if wallet != nil && chainClient == nil {
|
|
chainClient = wallet.ChainClient()
|
|
s.chainClient = chainClient
|
|
}
|
|
s.handlerMu.Unlock()
|
|
|
|
return lazyApplyHandler(request, wallet, chainClient)
|
|
}
|
|
|
|
// ErrNoAuth represents an error where authentication could not succeed
|
|
// due to a missing Authorization HTTP header.
|
|
var ErrNoAuth = errors.New("no auth")
|
|
|
|
// checkAuthHeader checks the HTTP Basic authentication supplied by a client
|
|
// in the HTTP request r. It errors with ErrNoAuth if the request does not
|
|
// contain the Authorization header, or another non-nil error if the
|
|
// authentication was provided but incorrect.
|
|
//
|
|
// This check is time-constant.
|
|
func (s *Server) checkAuthHeader(r *http.Request) error {
|
|
authhdr := r.Header["Authorization"]
|
|
if len(authhdr) == 0 {
|
|
return ErrNoAuth
|
|
}
|
|
|
|
authsha := fastsha256.Sum256([]byte(authhdr[0]))
|
|
cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:])
|
|
if cmp != 1 {
|
|
return errors.New("bad auth")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// throttledFn wraps an http.HandlerFunc with throttling of concurrent active
|
|
// clients by responding with an HTTP 429 when the threshold is crossed.
|
|
func throttledFn(threshold int64, f http.HandlerFunc) http.Handler {
|
|
return throttled(threshold, f)
|
|
}
|
|
|
|
// throttled wraps an http.Handler with throttling of concurrent active
|
|
// clients by responding with an HTTP 429 when the threshold is crossed.
|
|
func throttled(threshold int64, h http.Handler) http.Handler {
|
|
var active int64
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
current := atomic.AddInt64(&active, 1)
|
|
defer atomic.AddInt64(&active, -1)
|
|
|
|
if current-1 >= threshold {
|
|
log.Warnf("Reached threshold of %d concurrent active clients", threshold)
|
|
http.Error(w, "429 Too Many Requests", 429)
|
|
return
|
|
}
|
|
|
|
h.ServeHTTP(w, r)
|
|
})
|
|
}
|
|
|
|
// sanitizeRequest returns a sanitized string for the request which may be
|
|
// safely logged. It is intended to strip private keys, passphrases, and any
|
|
// other secrets from request parameters before they may be saved to a log file.
|
|
func sanitizeRequest(r *btcjson.Request) string {
|
|
// These are considered unsafe to log, so sanitize parameters.
|
|
switch r.Method {
|
|
case "encryptwallet", "importprivkey", "importwallet",
|
|
"signrawtransaction", "walletpassphrase",
|
|
"walletpassphrasechange":
|
|
|
|
return fmt.Sprintf(`{"id":%v,"method":"%s","params":SANITIZED %d parameters}`,
|
|
r.ID, r.Method, len(r.Params))
|
|
}
|
|
|
|
return fmt.Sprintf(`{"id":%v,"method":"%s","params":%v}`, r.ID,
|
|
r.Method, r.Params)
|
|
}
|
|
|
|
// idPointer returns a pointer to the passed ID, or nil if the interface is nil.
|
|
// Interface pointers are usually a red flag of doing something incorrectly,
|
|
// but this is only implemented here to work around an oddity with btcjson,
|
|
// which uses empty interface pointers for response IDs.
|
|
func idPointer(id interface{}) (p *interface{}) {
|
|
if id != nil {
|
|
p = &id
|
|
}
|
|
return
|
|
}
|
|
|
|
// invalidAuth checks whether a websocket request is a valid (parsable)
|
|
// authenticate request and checks the supplied username and passphrase
|
|
// against the server auth.
|
|
func (s *Server) invalidAuth(req *btcjson.Request) bool {
|
|
cmd, err := btcjson.UnmarshalCmd(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
authCmd, ok := cmd.(*btcjson.AuthenticateCmd)
|
|
if !ok {
|
|
return false
|
|
}
|
|
// Check credentials.
|
|
login := authCmd.Username + ":" + authCmd.Passphrase
|
|
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
|
|
authSha := fastsha256.Sum256([]byte(auth))
|
|
return subtle.ConstantTimeCompare(authSha[:], s.authsha[:]) != 1
|
|
}
|
|
|
|
func (s *Server) websocketClientRead(wsc *websocketClient) {
|
|
for {
|
|
_, request, err := wsc.conn.ReadMessage()
|
|
if err != nil {
|
|
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
log.Warnf("Websocket receive failed from client %s: %v",
|
|
wsc.remoteAddr, err)
|
|
}
|
|
close(wsc.allRequests)
|
|
break
|
|
}
|
|
wsc.allRequests <- request
|
|
}
|
|
}
|
|
|
|
func (s *Server) websocketClientRespond(wsc *websocketClient) {
|
|
// A for-select with a read of the quit channel is used instead of a
|
|
// for-range to provide clean shutdown. This is necessary due to
|
|
// WebsocketClientRead (which sends to the allRequests chan) not closing
|
|
// allRequests during shutdown if the remote websocket client is still
|
|
// connected.
|
|
out:
|
|
for {
|
|
select {
|
|
case reqBytes, ok := <-wsc.allRequests:
|
|
if !ok {
|
|
// client disconnected
|
|
break out
|
|
}
|
|
|
|
var req btcjson.Request
|
|
err := json.Unmarshal(reqBytes, &req)
|
|
if err != nil {
|
|
if !wsc.authenticated {
|
|
// Disconnect immediately.
|
|
break out
|
|
}
|
|
resp := makeResponse(req.ID, nil,
|
|
btcjson.ErrRPCInvalidRequest)
|
|
mresp, err := json.Marshal(resp)
|
|
// We expect the marshal to succeed. If it
|
|
// doesn't, it indicates some non-marshalable
|
|
// type in the response.
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = wsc.send(mresp)
|
|
if err != nil {
|
|
break out
|
|
}
|
|
continue
|
|
}
|
|
|
|
if req.Method == "authenticate" {
|
|
if wsc.authenticated || s.invalidAuth(&req) {
|
|
// Disconnect immediately.
|
|
break out
|
|
}
|
|
wsc.authenticated = true
|
|
resp := makeResponse(req.ID, nil, nil)
|
|
// Expected to never fail.
|
|
mresp, err := json.Marshal(resp)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = wsc.send(mresp)
|
|
if err != nil {
|
|
break out
|
|
}
|
|
continue
|
|
}
|
|
|
|
if !wsc.authenticated {
|
|
// Disconnect immediately.
|
|
break out
|
|
}
|
|
|
|
switch req.Method {
|
|
case "stop":
|
|
resp := makeResponse(req.ID,
|
|
"btcwallet stopping.", nil)
|
|
mresp, err := json.Marshal(resp)
|
|
// Expected to never fail.
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = wsc.send(mresp)
|
|
if err != nil {
|
|
break out
|
|
}
|
|
s.requestProcessShutdown()
|
|
break
|
|
|
|
default:
|
|
req := req // Copy for the closure
|
|
f := s.handlerClosure(&req)
|
|
wsc.wg.Add(1)
|
|
go func() {
|
|
resp, jsonErr := f()
|
|
mresp, err := btcjson.MarshalResponse(req.ID, resp, jsonErr)
|
|
if err != nil {
|
|
log.Errorf("Unable to marshal response: %v", err)
|
|
} else {
|
|
_ = wsc.send(mresp)
|
|
}
|
|
wsc.wg.Done()
|
|
}()
|
|
}
|
|
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
// Remove websocket client from notification group, or if the server is
|
|
// shutting down, wait until the notification handler has finished
|
|
// running. This is needed to ensure that no more notifications will be
|
|
// sent to the client's responses chan before it's closed below.
|
|
select {
|
|
case s.unregisterWSC <- wsc:
|
|
case <-s.quit:
|
|
<-s.notificationHandlerQuit
|
|
}
|
|
|
|
// allow client to disconnect after all handler goroutines are done
|
|
wsc.wg.Wait()
|
|
close(wsc.responses)
|
|
s.wg.Done()
|
|
}
|
|
|
|
func (s *Server) websocketClientSend(wsc *websocketClient) {
|
|
const deadline time.Duration = 2 * time.Second
|
|
out:
|
|
for {
|
|
select {
|
|
case response, ok := <-wsc.responses:
|
|
if !ok {
|
|
// client disconnected
|
|
break out
|
|
}
|
|
err := wsc.conn.SetWriteDeadline(time.Now().Add(deadline))
|
|
if err != nil {
|
|
log.Warnf("Cannot set write deadline on "+
|
|
"client %s: %v", wsc.remoteAddr, err)
|
|
}
|
|
err = wsc.conn.WriteMessage(websocket.TextMessage,
|
|
response)
|
|
if err != nil {
|
|
log.Warnf("Failed websocket send to client "+
|
|
"%s: %v", wsc.remoteAddr, err)
|
|
break out
|
|
}
|
|
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
close(wsc.quit)
|
|
log.Infof("Disconnected websocket client %s", wsc.remoteAddr)
|
|
s.wg.Done()
|
|
}
|
|
|
|
// websocketClientRPC starts the goroutines to serve JSON-RPC requests and
|
|
// notifications over a websocket connection for a single client.
|
|
func (s *Server) websocketClientRPC(wsc *websocketClient) {
|
|
log.Infof("New websocket client %s", wsc.remoteAddr)
|
|
|
|
// Clear the read deadline set before the websocket hijacked
|
|
// the connection.
|
|
if err := wsc.conn.SetReadDeadline(time.Time{}); err != nil {
|
|
log.Warnf("Cannot remove read deadline: %v", err)
|
|
}
|
|
|
|
// Add client context so notifications duplicated to each
|
|
// client are received by this client.
|
|
select {
|
|
case s.registerWSC <- wsc:
|
|
case <-s.quit:
|
|
return
|
|
}
|
|
|
|
// WebsocketClientRead is intentionally not run with the waitgroup
|
|
// so it is ignored during shutdown. This is to prevent a hang during
|
|
// shutdown where the goroutine is blocked on a read of the
|
|
// websocket connection if the client is still connected.
|
|
go s.websocketClientRead(wsc)
|
|
|
|
s.wg.Add(2)
|
|
go s.websocketClientRespond(wsc)
|
|
go s.websocketClientSend(wsc)
|
|
|
|
<-wsc.quit
|
|
}
|
|
|
|
// maxRequestSize specifies the maximum number of bytes in the request body
|
|
// that may be read from a client. This is currently limited to 4MB.
|
|
const maxRequestSize = 1024 * 1024 * 4
|
|
|
|
// PostClientRPC processes and replies to a JSON-RPC client request.
|
|
func (s *Server) PostClientRPC(w http.ResponseWriter, r *http.Request) {
|
|
body := http.MaxBytesReader(w, r.Body, maxRequestSize)
|
|
rpcRequest, err := ioutil.ReadAll(body)
|
|
if err != nil {
|
|
// TODO: what if the underlying reader errored?
|
|
http.Error(w, "413 Request Too Large.",
|
|
http.StatusRequestEntityTooLarge)
|
|
return
|
|
}
|
|
|
|
// First check whether wallet has a handler for this request's method.
|
|
// If unfound, the request is sent to the chain server for further
|
|
// processing. While checking the methods, disallow authenticate
|
|
// requests, as they are invalid for HTTP POST clients.
|
|
var req btcjson.Request
|
|
err = json.Unmarshal(rpcRequest, &req)
|
|
if err != nil {
|
|
resp, err := btcjson.MarshalResponse(req.ID, nil, btcjson.ErrRPCInvalidRequest)
|
|
if err != nil {
|
|
log.Errorf("Unable to marshal response: %v", err)
|
|
http.Error(w, "500 Internal Server Error",
|
|
http.StatusInternalServerError)
|
|
return
|
|
}
|
|
_, err = w.Write(resp)
|
|
if err != nil {
|
|
log.Warnf("Cannot write invalid request request to "+
|
|
"client: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Create the response and error from the request. Two special cases
|
|
// are handled for the authenticate and stop request methods.
|
|
var res interface{}
|
|
var jsonErr *btcjson.RPCError
|
|
var stop bool
|
|
switch req.Method {
|
|
case "authenticate":
|
|
// Drop it.
|
|
return
|
|
case "stop":
|
|
stop = true
|
|
res = "btcwallet stopping"
|
|
default:
|
|
res, jsonErr = s.handlerClosure(&req)()
|
|
}
|
|
|
|
// Marshal and send.
|
|
mresp, err := btcjson.MarshalResponse(req.ID, res, jsonErr)
|
|
if err != nil {
|
|
log.Errorf("Unable to marshal response: %v", err)
|
|
http.Error(w, "500 Internal Server Error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
_, err = w.Write(mresp)
|
|
if err != nil {
|
|
log.Warnf("Unable to respond to client: %v", err)
|
|
}
|
|
|
|
if stop {
|
|
s.requestProcessShutdown()
|
|
}
|
|
}
|
|
|
|
func (s *Server) requestProcessShutdown() {
|
|
select {
|
|
case s.requestShutdownChan <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// RequestProcessShutdown returns a channel that is sent to when an authorized
|
|
// client requests remote shutdown.
|
|
func (s *Server) RequestProcessShutdown() <-chan struct{} {
|
|
return s.requestShutdownChan
|
|
}
|
|
|
|
// Notification messages for websocket clients.
|
|
type (
|
|
wsClientNotification interface {
|
|
// This returns a slice only because some of these types result
|
|
// in multpile client notifications.
|
|
notificationCmds(w *wallet.Wallet) []interface{}
|
|
}
|
|
|
|
blockConnected wtxmgr.BlockMeta
|
|
blockDisconnected wtxmgr.BlockMeta
|
|
|
|
relevantTx chain.RelevantTx
|
|
|
|
managerLocked bool
|
|
|
|
confirmedBalance btcutil.Amount
|
|
unconfirmedBalance btcutil.Amount
|
|
|
|
btcdConnected bool
|
|
)
|
|
|
|
func (b blockConnected) notificationCmds(w *wallet.Wallet) []interface{} {
|
|
n := btcjson.NewBlockConnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix())
|
|
return []interface{}{n}
|
|
}
|
|
|
|
func (b blockDisconnected) notificationCmds(w *wallet.Wallet) []interface{} {
|
|
n := btcjson.NewBlockDisconnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix())
|
|
return []interface{}{n}
|
|
}
|
|
|
|
func (t relevantTx) notificationCmds(w *wallet.Wallet) []interface{} {
|
|
syncBlock := w.Manager.SyncedTo()
|
|
|
|
var block *wtxmgr.Block
|
|
if t.Block != nil {
|
|
block = &t.Block.Block
|
|
}
|
|
details, err := w.TxStore.UniqueTxDetails(&t.TxRecord.Hash, block)
|
|
if err != nil {
|
|
log.Errorf("Cannot fetch transaction details for "+
|
|
"client notification: %v", err)
|
|
return nil
|
|
}
|
|
if details == nil {
|
|
log.Errorf("No details found for client transaction notification")
|
|
return nil
|
|
}
|
|
|
|
ltr := wallet.ListTransactions(details, w.Manager, syncBlock.Height,
|
|
w.ChainParams())
|
|
ntfns := make([]interface{}, len(ltr))
|
|
for i := range ntfns {
|
|
ntfns[i] = btcjson.NewNewTxNtfn(ltr[i].Account, ltr[i])
|
|
}
|
|
return ntfns
|
|
}
|
|
|
|
func (l managerLocked) notificationCmds(w *wallet.Wallet) []interface{} {
|
|
n := btcjson.NewWalletLockStateNtfn(bool(l))
|
|
return []interface{}{n}
|
|
}
|
|
|
|
func (b confirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} {
|
|
n := btcjson.NewAccountBalanceNtfn("",
|
|
btcutil.Amount(b).ToBTC(), true)
|
|
return []interface{}{n}
|
|
}
|
|
|
|
func (b unconfirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} {
|
|
n := btcjson.NewAccountBalanceNtfn("",
|
|
btcutil.Amount(b).ToBTC(), false)
|
|
return []interface{}{n}
|
|
}
|
|
|
|
func (b btcdConnected) notificationCmds(w *wallet.Wallet) []interface{} {
|
|
n := btcjson.NewBtcdConnectedNtfn(bool(b))
|
|
return []interface{}{n}
|
|
}
|
|
|
|
func (s *Server) notificationListener() {
|
|
out:
|
|
for {
|
|
select {
|
|
case n := <-s.connectedBlocks:
|
|
s.enqueueNotification <- blockConnected(n)
|
|
case n := <-s.disconnectedBlocks:
|
|
s.enqueueNotification <- blockDisconnected(n)
|
|
case n := <-s.relevantTxs:
|
|
s.enqueueNotification <- relevantTx(n)
|
|
case n := <-s.managerLocked:
|
|
s.enqueueNotification <- managerLocked(n)
|
|
case n := <-s.confirmedBalance:
|
|
s.enqueueNotification <- confirmedBalance(n)
|
|
case n := <-s.unconfirmedBalance:
|
|
s.enqueueNotification <- unconfirmedBalance(n)
|
|
|
|
// Registration of all notifications is done by the handler so
|
|
// it doesn't require another Server mutex.
|
|
case <-s.registerWalletNtfns:
|
|
connectedBlocks, err := s.wallet.ListenConnectedBlocks()
|
|
if err != nil {
|
|
log.Errorf("Could not register for new "+
|
|
"connected block notifications: %v",
|
|
err)
|
|
continue
|
|
}
|
|
disconnectedBlocks, err := s.wallet.ListenDisconnectedBlocks()
|
|
if err != nil {
|
|
log.Errorf("Could not register for new "+
|
|
"disconnected block notifications: %v",
|
|
err)
|
|
continue
|
|
}
|
|
relevantTxs, err := s.wallet.ListenRelevantTxs()
|
|
if err != nil {
|
|
log.Errorf("Could not register for new relevant "+
|
|
"transaction notifications: %v", err)
|
|
continue
|
|
}
|
|
managerLocked, err := s.wallet.ListenLockStatus()
|
|
if err != nil {
|
|
log.Errorf("Could not register for manager "+
|
|
"lock state changes: %v", err)
|
|
continue
|
|
}
|
|
confirmedBalance, err := s.wallet.ListenConfirmedBalance()
|
|
if err != nil {
|
|
log.Errorf("Could not register for confirmed "+
|
|
"balance changes: %v", err)
|
|
continue
|
|
}
|
|
unconfirmedBalance, err := s.wallet.ListenUnconfirmedBalance()
|
|
if err != nil {
|
|
log.Errorf("Could not register for unconfirmed "+
|
|
"balance changes: %v", err)
|
|
continue
|
|
}
|
|
s.connectedBlocks = connectedBlocks
|
|
s.disconnectedBlocks = disconnectedBlocks
|
|
s.relevantTxs = relevantTxs
|
|
s.managerLocked = managerLocked
|
|
s.confirmedBalance = confirmedBalance
|
|
s.unconfirmedBalance = unconfirmedBalance
|
|
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
close(s.enqueueNotification)
|
|
go s.drainNotifications()
|
|
s.wg.Done()
|
|
}
|
|
|
|
func (s *Server) drainNotifications() {
|
|
for {
|
|
select {
|
|
case <-s.connectedBlocks:
|
|
case <-s.disconnectedBlocks:
|
|
case <-s.relevantTxs:
|
|
case <-s.managerLocked:
|
|
case <-s.confirmedBalance:
|
|
case <-s.unconfirmedBalance:
|
|
case <-s.registerWalletNtfns:
|
|
}
|
|
}
|
|
}
|
|
|
|
// notificationQueue manages an infinitly-growing queue of notifications that
|
|
// wallet websocket clients may be interested in. It quits when the
|
|
// enqueueNotification channel is closed, dropping any still pending
|
|
// notifications.
|
|
func (s *Server) notificationQueue() {
|
|
var q []wsClientNotification
|
|
var dequeue chan<- wsClientNotification
|
|
skipQueue := s.dequeueNotification
|
|
var next wsClientNotification
|
|
out:
|
|
for {
|
|
select {
|
|
case n, ok := <-s.enqueueNotification:
|
|
if !ok {
|
|
// Sender closed input channel.
|
|
break out
|
|
}
|
|
|
|
// Either send to out immediately if skipQueue is
|
|
// non-nil (queue is empty) and reader is ready,
|
|
// or append to the queue and send later.
|
|
select {
|
|
case skipQueue <- n:
|
|
default:
|
|
q = append(q, n)
|
|
dequeue = s.dequeueNotification
|
|
skipQueue = nil
|
|
next = q[0]
|
|
}
|
|
|
|
case dequeue <- next:
|
|
q[0] = nil // avoid leak
|
|
q = q[1:]
|
|
if len(q) == 0 {
|
|
dequeue = nil
|
|
skipQueue = s.dequeueNotification
|
|
} else {
|
|
next = q[0]
|
|
}
|
|
}
|
|
}
|
|
close(s.dequeueNotification)
|
|
s.wg.Done()
|
|
}
|
|
|
|
func (s *Server) notificationHandler() {
|
|
clients := make(map[chan struct{}]*websocketClient)
|
|
out:
|
|
for {
|
|
select {
|
|
case c := <-s.registerWSC:
|
|
clients[c.quit] = c
|
|
|
|
case c := <-s.unregisterWSC:
|
|
delete(clients, c.quit)
|
|
|
|
case nmsg, ok := <-s.dequeueNotification:
|
|
// No more notifications.
|
|
if !ok {
|
|
break out
|
|
}
|
|
|
|
// Ignore if there are no clients to receive the
|
|
// notification.
|
|
if len(clients) == 0 {
|
|
continue
|
|
}
|
|
|
|
ns := nmsg.notificationCmds(s.wallet)
|
|
for _, n := range ns {
|
|
mn, err := btcjson.MarshalCmd(nil, n)
|
|
// All notifications are expected to be
|
|
// marshalable.
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
for _, c := range clients {
|
|
if err := c.send(mn); err != nil {
|
|
delete(clients, c.quit)
|
|
}
|
|
}
|
|
}
|
|
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
close(s.notificationHandlerQuit)
|
|
s.wg.Done()
|
|
}
|