6cf22b7944
These notifications were added to support real time updates for btcgui. As the btcgui project is no longer being developed, there are no more consumers of this API, and it makes sense to remove them given their various issues (the largest being that notifiations are sent unsubscribed to clients that may never be interrested in them). A new notification server has already been added to the wallet package to handle notifications in a RPC-server agnostic way. This server is the means by which the wallet notifies changes for gRPC clients. If per-client registered notifications are to be re-added for the JSON-RPC server, they should be integrated with the new notification server rather than using this legacy code.
642 lines
18 KiB
Go
642 lines
18 KiB
Go
// Copyright (c) 2013-2015 The btcsuite developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package legacyrpc
|
|
|
|
import (
|
|
"crypto/subtle"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcjson"
|
|
"github.com/btcsuite/btcwallet/chain"
|
|
"github.com/btcsuite/btcwallet/wallet"
|
|
"github.com/btcsuite/fastsha256"
|
|
"github.com/btcsuite/websocket"
|
|
)
|
|
|
|
type websocketClient struct {
|
|
conn *websocket.Conn
|
|
authenticated bool
|
|
remoteAddr string
|
|
allRequests chan []byte
|
|
responses chan []byte
|
|
quit chan struct{} // closed on disconnect
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func newWebsocketClient(c *websocket.Conn, authenticated bool, remoteAddr string) *websocketClient {
|
|
return &websocketClient{
|
|
conn: c,
|
|
authenticated: authenticated,
|
|
remoteAddr: remoteAddr,
|
|
allRequests: make(chan []byte),
|
|
responses: make(chan []byte),
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (c *websocketClient) send(b []byte) error {
|
|
select {
|
|
case c.responses <- b:
|
|
return nil
|
|
case <-c.quit:
|
|
return errors.New("websocket client disconnected")
|
|
}
|
|
}
|
|
|
|
// Server holds the items the RPC server may need to access (auth,
|
|
// config, shutdown, etc.)
|
|
type Server struct {
|
|
httpServer http.Server
|
|
wallet *wallet.Wallet
|
|
walletLoader *wallet.Loader
|
|
chainClient *chain.RPCClient
|
|
handlerLookup func(string) (requestHandler, bool)
|
|
handlerMu sync.Mutex
|
|
|
|
listeners []net.Listener
|
|
authsha [fastsha256.Size]byte
|
|
upgrader websocket.Upgrader
|
|
|
|
maxPostClients int64 // Max concurrent HTTP POST clients.
|
|
maxWebsocketClients int64 // Max concurrent websocket clients.
|
|
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
quitMtx sync.Mutex
|
|
|
|
requestShutdownChan chan struct{}
|
|
}
|
|
|
|
// jsonAuthFail sends a message back to the client if the http auth is rejected.
|
|
func jsonAuthFail(w http.ResponseWriter) {
|
|
w.Header().Add("WWW-Authenticate", `Basic realm="btcwallet RPC"`)
|
|
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
|
|
}
|
|
|
|
// NewServer creates a new server for serving legacy RPC client connections,
|
|
// both HTTP POST and websocket.
|
|
func NewServer(opts *Options, walletLoader *wallet.Loader, listeners []net.Listener) *Server {
|
|
serveMux := http.NewServeMux()
|
|
const rpcAuthTimeoutSeconds = 10
|
|
|
|
server := &Server{
|
|
httpServer: http.Server{
|
|
Handler: serveMux,
|
|
|
|
// Timeout connections which don't complete the initial
|
|
// handshake within the allowed timeframe.
|
|
ReadTimeout: time.Second * rpcAuthTimeoutSeconds,
|
|
},
|
|
walletLoader: walletLoader,
|
|
maxPostClients: opts.MaxPOSTClients,
|
|
maxWebsocketClients: opts.MaxWebsocketClients,
|
|
listeners: listeners,
|
|
// A hash of the HTTP basic auth string is used for a constant
|
|
// time comparison.
|
|
authsha: fastsha256.Sum256(httpBasicAuth(opts.Username, opts.Password)),
|
|
upgrader: websocket.Upgrader{
|
|
// Allow all origins.
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
},
|
|
quit: make(chan struct{}),
|
|
requestShutdownChan: make(chan struct{}, 1),
|
|
}
|
|
|
|
serveMux.Handle("/", throttledFn(opts.MaxPOSTClients,
|
|
func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Connection", "close")
|
|
w.Header().Set("Content-Type", "application/json")
|
|
r.Close = true
|
|
|
|
if err := server.checkAuthHeader(r); err != nil {
|
|
log.Warnf("Unauthorized client connection attempt")
|
|
jsonAuthFail(w)
|
|
return
|
|
}
|
|
server.wg.Add(1)
|
|
server.PostClientRPC(w, r)
|
|
server.wg.Done()
|
|
}))
|
|
|
|
serveMux.Handle("/ws", throttledFn(opts.MaxWebsocketClients,
|
|
func(w http.ResponseWriter, r *http.Request) {
|
|
authenticated := false
|
|
switch server.checkAuthHeader(r) {
|
|
case nil:
|
|
authenticated = true
|
|
case ErrNoAuth:
|
|
// nothing
|
|
default:
|
|
// If auth was supplied but incorrect, rather than simply
|
|
// being missing, immediately terminate the connection.
|
|
log.Warnf("Disconnecting improperly authorized " +
|
|
"websocket client")
|
|
jsonAuthFail(w)
|
|
return
|
|
}
|
|
|
|
conn, err := server.upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Warnf("Cannot websocket upgrade client %s: %v",
|
|
r.RemoteAddr, err)
|
|
return
|
|
}
|
|
wsc := newWebsocketClient(conn, authenticated, r.RemoteAddr)
|
|
server.websocketClientRPC(wsc)
|
|
}))
|
|
|
|
for _, lis := range listeners {
|
|
server.serve(lis)
|
|
}
|
|
|
|
return server
|
|
}
|
|
|
|
// httpBasicAuth returns the UTF-8 bytes of the HTTP Basic authentication
|
|
// string:
|
|
//
|
|
// "Basic " + base64(username + ":" + password)
|
|
func httpBasicAuth(username, password string) []byte {
|
|
const header = "Basic "
|
|
base64 := base64.StdEncoding
|
|
|
|
b64InputLen := len(username) + len(":") + len(password)
|
|
b64Input := make([]byte, 0, b64InputLen)
|
|
b64Input = append(b64Input, username...)
|
|
b64Input = append(b64Input, ':')
|
|
b64Input = append(b64Input, password...)
|
|
|
|
output := make([]byte, len(header)+base64.EncodedLen(b64InputLen))
|
|
copy(output, header)
|
|
base64.Encode(output[len(header):], b64Input)
|
|
return output
|
|
}
|
|
|
|
// serve serves HTTP POST and websocket RPC for the legacy JSON-RPC RPC server.
|
|
// This function does not block on lis.Accept.
|
|
func (s *Server) serve(lis net.Listener) {
|
|
s.wg.Add(1)
|
|
go func() {
|
|
log.Infof("Listening on %s", lis.Addr())
|
|
err := s.httpServer.Serve(lis)
|
|
log.Tracef("Finished serving RPC: %v", err)
|
|
s.wg.Done()
|
|
}()
|
|
}
|
|
|
|
// RegisterWallet associates the legacy RPC server with the wallet. This
|
|
// function must be called before any wallet RPCs can be called by clients.
|
|
func (s *Server) RegisterWallet(w *wallet.Wallet) {
|
|
s.handlerMu.Lock()
|
|
s.wallet = w
|
|
s.handlerMu.Unlock()
|
|
}
|
|
|
|
// Stop gracefully shuts down the rpc server by stopping and disconnecting all
|
|
// clients, disconnecting the chain server connection, and closing the wallet's
|
|
// account files. This blocks until shutdown completes.
|
|
func (s *Server) Stop() {
|
|
s.quitMtx.Lock()
|
|
select {
|
|
case <-s.quit:
|
|
s.quitMtx.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Stop the connected wallet and chain server, if any.
|
|
s.handlerMu.Lock()
|
|
wallet := s.wallet
|
|
chainClient := s.chainClient
|
|
s.handlerMu.Unlock()
|
|
if wallet != nil {
|
|
wallet.Stop()
|
|
}
|
|
if chainClient != nil {
|
|
chainClient.Stop()
|
|
}
|
|
|
|
// Stop all the listeners.
|
|
for _, listener := range s.listeners {
|
|
err := listener.Close()
|
|
if err != nil {
|
|
log.Errorf("Cannot close listener `%s`: %v",
|
|
listener.Addr(), err)
|
|
}
|
|
}
|
|
|
|
// Signal the remaining goroutines to stop.
|
|
close(s.quit)
|
|
s.quitMtx.Unlock()
|
|
|
|
// First wait for the wallet and chain server to stop, if they
|
|
// were ever set.
|
|
if wallet != nil {
|
|
wallet.WaitForShutdown()
|
|
}
|
|
if chainClient != nil {
|
|
chainClient.WaitForShutdown()
|
|
}
|
|
|
|
// Wait for all remaining goroutines to exit.
|
|
s.wg.Wait()
|
|
}
|
|
|
|
// SetChainServer sets the chain server client component needed to run a fully
|
|
// functional bitcoin wallet RPC server. This can be called to enable RPC
|
|
// passthrough even before a loaded wallet is set, but the wallet's RPC client
|
|
// is preferred.
|
|
func (s *Server) SetChainServer(chainClient *chain.RPCClient) {
|
|
s.handlerMu.Lock()
|
|
s.chainClient = chainClient
|
|
s.handlerMu.Unlock()
|
|
}
|
|
|
|
// handlerClosure creates a closure function for handling requests of the given
|
|
// method. This may be a request that is handled directly by btcwallet, or
|
|
// a chain server request that is handled by passing the request down to btcd.
|
|
//
|
|
// NOTE: These handlers do not handle special cases, such as the authenticate
|
|
// method. Each of these must be checked beforehand (the method is already
|
|
// known) and handled accordingly.
|
|
func (s *Server) handlerClosure(request *btcjson.Request) lazyHandler {
|
|
s.handlerMu.Lock()
|
|
// With the lock held, make copies of these pointers for the closure.
|
|
wallet := s.wallet
|
|
chainClient := s.chainClient
|
|
if wallet != nil && chainClient == nil {
|
|
chainClient = wallet.ChainClient()
|
|
s.chainClient = chainClient
|
|
}
|
|
s.handlerMu.Unlock()
|
|
|
|
return lazyApplyHandler(request, wallet, chainClient)
|
|
}
|
|
|
|
// ErrNoAuth represents an error where authentication could not succeed
|
|
// due to a missing Authorization HTTP header.
|
|
var ErrNoAuth = errors.New("no auth")
|
|
|
|
// checkAuthHeader checks the HTTP Basic authentication supplied by a client
|
|
// in the HTTP request r. It errors with ErrNoAuth if the request does not
|
|
// contain the Authorization header, or another non-nil error if the
|
|
// authentication was provided but incorrect.
|
|
//
|
|
// This check is time-constant.
|
|
func (s *Server) checkAuthHeader(r *http.Request) error {
|
|
authhdr := r.Header["Authorization"]
|
|
if len(authhdr) == 0 {
|
|
return ErrNoAuth
|
|
}
|
|
|
|
authsha := fastsha256.Sum256([]byte(authhdr[0]))
|
|
cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:])
|
|
if cmp != 1 {
|
|
return errors.New("bad auth")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// throttledFn wraps an http.HandlerFunc with throttling of concurrent active
|
|
// clients by responding with an HTTP 429 when the threshold is crossed.
|
|
func throttledFn(threshold int64, f http.HandlerFunc) http.Handler {
|
|
return throttled(threshold, f)
|
|
}
|
|
|
|
// throttled wraps an http.Handler with throttling of concurrent active
|
|
// clients by responding with an HTTP 429 when the threshold is crossed.
|
|
func throttled(threshold int64, h http.Handler) http.Handler {
|
|
var active int64
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
current := atomic.AddInt64(&active, 1)
|
|
defer atomic.AddInt64(&active, -1)
|
|
|
|
if current-1 >= threshold {
|
|
log.Warnf("Reached threshold of %d concurrent active clients", threshold)
|
|
http.Error(w, "429 Too Many Requests", 429)
|
|
return
|
|
}
|
|
|
|
h.ServeHTTP(w, r)
|
|
})
|
|
}
|
|
|
|
// sanitizeRequest returns a sanitized string for the request which may be
|
|
// safely logged. It is intended to strip private keys, passphrases, and any
|
|
// other secrets from request parameters before they may be saved to a log file.
|
|
func sanitizeRequest(r *btcjson.Request) string {
|
|
// These are considered unsafe to log, so sanitize parameters.
|
|
switch r.Method {
|
|
case "encryptwallet", "importprivkey", "importwallet",
|
|
"signrawtransaction", "walletpassphrase",
|
|
"walletpassphrasechange":
|
|
|
|
return fmt.Sprintf(`{"id":%v,"method":"%s","params":SANITIZED %d parameters}`,
|
|
r.ID, r.Method, len(r.Params))
|
|
}
|
|
|
|
return fmt.Sprintf(`{"id":%v,"method":"%s","params":%v}`, r.ID,
|
|
r.Method, r.Params)
|
|
}
|
|
|
|
// idPointer returns a pointer to the passed ID, or nil if the interface is nil.
|
|
// Interface pointers are usually a red flag of doing something incorrectly,
|
|
// but this is only implemented here to work around an oddity with btcjson,
|
|
// which uses empty interface pointers for response IDs.
|
|
func idPointer(id interface{}) (p *interface{}) {
|
|
if id != nil {
|
|
p = &id
|
|
}
|
|
return
|
|
}
|
|
|
|
// invalidAuth checks whether a websocket request is a valid (parsable)
|
|
// authenticate request and checks the supplied username and passphrase
|
|
// against the server auth.
|
|
func (s *Server) invalidAuth(req *btcjson.Request) bool {
|
|
cmd, err := btcjson.UnmarshalCmd(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
authCmd, ok := cmd.(*btcjson.AuthenticateCmd)
|
|
if !ok {
|
|
return false
|
|
}
|
|
// Check credentials.
|
|
login := authCmd.Username + ":" + authCmd.Passphrase
|
|
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
|
|
authSha := fastsha256.Sum256([]byte(auth))
|
|
return subtle.ConstantTimeCompare(authSha[:], s.authsha[:]) != 1
|
|
}
|
|
|
|
func (s *Server) websocketClientRead(wsc *websocketClient) {
|
|
for {
|
|
_, request, err := wsc.conn.ReadMessage()
|
|
if err != nil {
|
|
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
log.Warnf("Websocket receive failed from client %s: %v",
|
|
wsc.remoteAddr, err)
|
|
}
|
|
close(wsc.allRequests)
|
|
break
|
|
}
|
|
wsc.allRequests <- request
|
|
}
|
|
}
|
|
|
|
func (s *Server) websocketClientRespond(wsc *websocketClient) {
|
|
// A for-select with a read of the quit channel is used instead of a
|
|
// for-range to provide clean shutdown. This is necessary due to
|
|
// WebsocketClientRead (which sends to the allRequests chan) not closing
|
|
// allRequests during shutdown if the remote websocket client is still
|
|
// connected.
|
|
out:
|
|
for {
|
|
select {
|
|
case reqBytes, ok := <-wsc.allRequests:
|
|
if !ok {
|
|
// client disconnected
|
|
break out
|
|
}
|
|
|
|
var req btcjson.Request
|
|
err := json.Unmarshal(reqBytes, &req)
|
|
if err != nil {
|
|
if !wsc.authenticated {
|
|
// Disconnect immediately.
|
|
break out
|
|
}
|
|
resp := makeResponse(req.ID, nil,
|
|
btcjson.ErrRPCInvalidRequest)
|
|
mresp, err := json.Marshal(resp)
|
|
// We expect the marshal to succeed. If it
|
|
// doesn't, it indicates some non-marshalable
|
|
// type in the response.
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = wsc.send(mresp)
|
|
if err != nil {
|
|
break out
|
|
}
|
|
continue
|
|
}
|
|
|
|
if req.Method == "authenticate" {
|
|
if wsc.authenticated || s.invalidAuth(&req) {
|
|
// Disconnect immediately.
|
|
break out
|
|
}
|
|
wsc.authenticated = true
|
|
resp := makeResponse(req.ID, nil, nil)
|
|
// Expected to never fail.
|
|
mresp, err := json.Marshal(resp)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = wsc.send(mresp)
|
|
if err != nil {
|
|
break out
|
|
}
|
|
continue
|
|
}
|
|
|
|
if !wsc.authenticated {
|
|
// Disconnect immediately.
|
|
break out
|
|
}
|
|
|
|
switch req.Method {
|
|
case "stop":
|
|
resp := makeResponse(req.ID,
|
|
"btcwallet stopping.", nil)
|
|
mresp, err := json.Marshal(resp)
|
|
// Expected to never fail.
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = wsc.send(mresp)
|
|
if err != nil {
|
|
break out
|
|
}
|
|
s.requestProcessShutdown()
|
|
break
|
|
|
|
default:
|
|
req := req // Copy for the closure
|
|
f := s.handlerClosure(&req)
|
|
wsc.wg.Add(1)
|
|
go func() {
|
|
resp, jsonErr := f()
|
|
mresp, err := btcjson.MarshalResponse(req.ID, resp, jsonErr)
|
|
if err != nil {
|
|
log.Errorf("Unable to marshal response: %v", err)
|
|
} else {
|
|
_ = wsc.send(mresp)
|
|
}
|
|
wsc.wg.Done()
|
|
}()
|
|
}
|
|
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
// allow client to disconnect after all handler goroutines are done
|
|
wsc.wg.Wait()
|
|
close(wsc.responses)
|
|
s.wg.Done()
|
|
}
|
|
|
|
func (s *Server) websocketClientSend(wsc *websocketClient) {
|
|
const deadline time.Duration = 2 * time.Second
|
|
out:
|
|
for {
|
|
select {
|
|
case response, ok := <-wsc.responses:
|
|
if !ok {
|
|
// client disconnected
|
|
break out
|
|
}
|
|
err := wsc.conn.SetWriteDeadline(time.Now().Add(deadline))
|
|
if err != nil {
|
|
log.Warnf("Cannot set write deadline on "+
|
|
"client %s: %v", wsc.remoteAddr, err)
|
|
}
|
|
err = wsc.conn.WriteMessage(websocket.TextMessage,
|
|
response)
|
|
if err != nil {
|
|
log.Warnf("Failed websocket send to client "+
|
|
"%s: %v", wsc.remoteAddr, err)
|
|
break out
|
|
}
|
|
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
close(wsc.quit)
|
|
log.Infof("Disconnected websocket client %s", wsc.remoteAddr)
|
|
s.wg.Done()
|
|
}
|
|
|
|
// websocketClientRPC starts the goroutines to serve JSON-RPC requests over a
|
|
// websocket connection for a single client.
|
|
func (s *Server) websocketClientRPC(wsc *websocketClient) {
|
|
log.Infof("New websocket client %s", wsc.remoteAddr)
|
|
|
|
// Clear the read deadline set before the websocket hijacked
|
|
// the connection.
|
|
if err := wsc.conn.SetReadDeadline(time.Time{}); err != nil {
|
|
log.Warnf("Cannot remove read deadline: %v", err)
|
|
}
|
|
|
|
// WebsocketClientRead is intentionally not run with the waitgroup
|
|
// so it is ignored during shutdown. This is to prevent a hang during
|
|
// shutdown where the goroutine is blocked on a read of the
|
|
// websocket connection if the client is still connected.
|
|
go s.websocketClientRead(wsc)
|
|
|
|
s.wg.Add(2)
|
|
go s.websocketClientRespond(wsc)
|
|
go s.websocketClientSend(wsc)
|
|
|
|
<-wsc.quit
|
|
}
|
|
|
|
// maxRequestSize specifies the maximum number of bytes in the request body
|
|
// that may be read from a client. This is currently limited to 4MB.
|
|
const maxRequestSize = 1024 * 1024 * 4
|
|
|
|
// PostClientRPC processes and replies to a JSON-RPC client request.
|
|
func (s *Server) PostClientRPC(w http.ResponseWriter, r *http.Request) {
|
|
body := http.MaxBytesReader(w, r.Body, maxRequestSize)
|
|
rpcRequest, err := ioutil.ReadAll(body)
|
|
if err != nil {
|
|
// TODO: what if the underlying reader errored?
|
|
http.Error(w, "413 Request Too Large.",
|
|
http.StatusRequestEntityTooLarge)
|
|
return
|
|
}
|
|
|
|
// First check whether wallet has a handler for this request's method.
|
|
// If unfound, the request is sent to the chain server for further
|
|
// processing. While checking the methods, disallow authenticate
|
|
// requests, as they are invalid for HTTP POST clients.
|
|
var req btcjson.Request
|
|
err = json.Unmarshal(rpcRequest, &req)
|
|
if err != nil {
|
|
resp, err := btcjson.MarshalResponse(req.ID, nil, btcjson.ErrRPCInvalidRequest)
|
|
if err != nil {
|
|
log.Errorf("Unable to marshal response: %v", err)
|
|
http.Error(w, "500 Internal Server Error",
|
|
http.StatusInternalServerError)
|
|
return
|
|
}
|
|
_, err = w.Write(resp)
|
|
if err != nil {
|
|
log.Warnf("Cannot write invalid request request to "+
|
|
"client: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Create the response and error from the request. Two special cases
|
|
// are handled for the authenticate and stop request methods.
|
|
var res interface{}
|
|
var jsonErr *btcjson.RPCError
|
|
var stop bool
|
|
switch req.Method {
|
|
case "authenticate":
|
|
// Drop it.
|
|
return
|
|
case "stop":
|
|
stop = true
|
|
res = "btcwallet stopping"
|
|
default:
|
|
res, jsonErr = s.handlerClosure(&req)()
|
|
}
|
|
|
|
// Marshal and send.
|
|
mresp, err := btcjson.MarshalResponse(req.ID, res, jsonErr)
|
|
if err != nil {
|
|
log.Errorf("Unable to marshal response: %v", err)
|
|
http.Error(w, "500 Internal Server Error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
_, err = w.Write(mresp)
|
|
if err != nil {
|
|
log.Warnf("Unable to respond to client: %v", err)
|
|
}
|
|
|
|
if stop {
|
|
s.requestProcessShutdown()
|
|
}
|
|
}
|
|
|
|
func (s *Server) requestProcessShutdown() {
|
|
select {
|
|
case s.requestShutdownChan <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// RequestProcessShutdown returns a channel that is sent to when an authorized
|
|
// client requests remote shutdown.
|
|
func (s *Server) RequestProcessShutdown() <-chan struct{} {
|
|
return s.requestShutdownChan
|
|
}
|