Throttle RPC and WS concurrent active clients

This change set implements tunable concurrent active clients throttling.
This commit is contained in:
Tomás Senart 2014-07-03 03:36:38 +02:00 committed by Josh Rickmar
parent def3543ba6
commit 9f4bfeb056
4 changed files with 186 additions and 82 deletions

6
cmd.go
View file

@ -182,7 +182,11 @@ func walletMain() error {
// Start account manager and open accounts. // Start account manager and open accounts.
AcctMgr.Start() AcctMgr.Start()
server, err = newRPCServer(cfg.SvrListeners) server, err = newRPCServer(
cfg.SvrListeners,
cfg.RPCMaxClients,
cfg.RPCMaxWebsockets,
)
if err != nil { if err != nil {
log.Errorf("Unable to create HTTP server: %v", err) log.Errorf("Unable to create HTTP server: %v", err)
return err return err

View file

@ -30,14 +30,16 @@ import (
) )
const ( const (
defaultCAFilename = "btcd.cert" defaultCAFilename = "btcd.cert"
defaultConfigFilename = "btcwallet.conf" defaultConfigFilename = "btcwallet.conf"
defaultBtcNet = btcwire.TestNet3 defaultBtcNet = btcwire.TestNet3
defaultLogLevel = "info" defaultLogLevel = "info"
defaultLogDirname = "logs" defaultLogDirname = "logs"
defaultLogFilename = "btcwallet.log" defaultLogFilename = "btcwallet.log"
defaultKeypoolSize = 100 defaultKeypoolSize = 100
defaultDisallowFree = false defaultDisallowFree = false
defaultRPCMaxClients = 10
defaultRPCMaxWebsockets = 25
) )
var ( var (
@ -52,28 +54,30 @@ var (
) )
type config struct { type config struct {
ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"`
CAFile string `long:"cafile" description:"File containing root certificates to authenticate a TLS connections with btcd"` CAFile string `long:"cafile" description:"File containing root certificates to authenticate a TLS connections with btcd"`
RPCConnect string `short:"c" long:"rpcconnect" description:"Hostname/IP and port of btcd RPC server to connect to (default localhost:18334, mainnet: localhost:8334, simnet: localhost:18556)"` RPCConnect string `short:"c" long:"rpcconnect" description:"Hostname/IP and port of btcd RPC server to connect to (default localhost:18334, mainnet: localhost:8334, simnet: localhost:18556)"`
DebugLevel string `short:"d" long:"debuglevel" description:"Logging level {trace, debug, info, warn, error, critical}"` DebugLevel string `short:"d" long:"debuglevel" description:"Logging level {trace, debug, info, warn, error, critical}"`
ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"` ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"`
SvrListeners []string `long:"rpclisten" description:"Listen for RPC/websocket connections on this interface/port (default port: 18332, mainnet: 8332, simnet: 18554)"` SvrListeners []string `long:"rpclisten" description:"Listen for RPC/websocket connections on this interface/port (default port: 18332, mainnet: 8332, simnet: 18554)"`
DataDir string `short:"D" long:"datadir" description:"Directory to store wallets and transactions"` DataDir string `short:"D" long:"datadir" description:"Directory to store wallets and transactions"`
LogDir string `long:"logdir" description:"Directory to log output."` LogDir string `long:"logdir" description:"Directory to log output."`
Username string `short:"u" long:"username" description:"Username for client and btcd authorization"` Username string `short:"u" long:"username" description:"Username for client and btcd authorization"`
Password string `short:"P" long:"password" default-mask:"-" description:"Password for client and btcd authorization"` Password string `short:"P" long:"password" default-mask:"-" description:"Password for client and btcd authorization"`
BtcdUsername string `long:"btcdusername" description:"Alternative username for btcd authorization"` BtcdUsername string `long:"btcdusername" description:"Alternative username for btcd authorization"`
BtcdPassword string `long:"btcdpassword" default-mask:"-" description:"Alternative password for btcd authorization"` BtcdPassword string `long:"btcdpassword" default-mask:"-" description:"Alternative password for btcd authorization"`
RPCCert string `long:"rpccert" description:"File containing the certificate file"` RPCCert string `long:"rpccert" description:"File containing the certificate file"`
RPCKey string `long:"rpckey" description:"File containing the certificate key"` RPCKey string `long:"rpckey" description:"File containing the certificate key"`
MainNet bool `long:"mainnet" description:"Use the main Bitcoin network (default testnet3)"` RPCMaxClients int64 `long:"rpcmaxclients" description:"Max number of RPC clients for standard connections"`
SimNet bool `long:"simnet" description:"Use the simulation test network (default testnet3)"` RPCMaxWebsockets int64 `long:"rpcmaxwebsockets" description:"Max number of RPC websocket connections"`
KeypoolSize uint `short:"k" long:"keypoolsize" description:"Maximum number of addresses in keypool"` MainNet bool `long:"mainnet" description:"Use the main Bitcoin network (default testnet3)"`
DisallowFree bool `long:"disallowfree" description:"Force transactions to always include a fee"` SimNet bool `long:"simnet" description:"Use the simulation test network (default testnet3)"`
Proxy string `long:"proxy" description:"Connect via SOCKS5 proxy (eg. 127.0.0.1:9050)"` KeypoolSize uint `short:"k" long:"keypoolsize" description:"Maximum number of addresses in keypool"`
ProxyUser string `long:"proxyuser" description:"Username for proxy server"` DisallowFree bool `long:"disallowfree" description:"Force transactions to always include a fee"`
ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"` Proxy string `long:"proxy" description:"Connect via SOCKS5 proxy (eg. 127.0.0.1:9050)"`
Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"` ProxyUser string `long:"proxyuser" description:"Username for proxy server"`
ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"`
Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"`
} }
// cleanAndExpandPath expands environement variables and leading ~ in the // cleanAndExpandPath expands environement variables and leading ~ in the
@ -233,14 +237,16 @@ func normalizeAddress(addr, defaultPort string) string {
func loadConfig() (*config, []string, error) { func loadConfig() (*config, []string, error) {
// Default config. // Default config.
cfg := config{ cfg := config{
DebugLevel: defaultLogLevel, DebugLevel: defaultLogLevel,
ConfigFile: defaultConfigFile, ConfigFile: defaultConfigFile,
DataDir: defaultDataDir, DataDir: defaultDataDir,
LogDir: defaultLogDir, LogDir: defaultLogDir,
RPCKey: defaultRPCKeyFile, RPCKey: defaultRPCKeyFile,
RPCCert: defaultRPCCertFile, RPCCert: defaultRPCCertFile,
KeypoolSize: defaultKeypoolSize, KeypoolSize: defaultKeypoolSize,
DisallowFree: defaultDisallowFree, DisallowFree: defaultDisallowFree,
RPCMaxClients: defaultRPCMaxClients,
RPCMaxWebsockets: defaultRPCMaxWebsockets,
} }
// A config file in the current directory takes precedence. // A config file in the current directory takes precedence.

View file

@ -35,6 +35,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/conformal/btcec" "github.com/conformal/btcec"
@ -206,10 +207,12 @@ func genCertPair(certFile, keyFile string) error {
// rpcServer holds the items the RPC server may need to access (auth, // rpcServer holds the items the RPC server may need to access (auth,
// config, shutdown, etc.) // config, shutdown, etc.)
type rpcServer struct { type rpcServer struct {
wg sync.WaitGroup wg sync.WaitGroup
listeners []net.Listener maxClients int64 // Maximum number of concurrent active RPC HTTP clients
authsha [sha256.Size]byte maxWebsockets int64 // Maximum number of concurrent active RPC WS clients
wsClients map[*websocketClient]struct{} listeners []net.Listener
authsha [sha256.Size]byte
wsClients map[*websocketClient]struct{}
upgrader websocket.Upgrader upgrader websocket.Upgrader
@ -224,12 +227,14 @@ type rpcServer struct {
// newRPCServer creates a new server for serving RPC client connections, both // newRPCServer creates a new server for serving RPC client connections, both
// HTTP POST and websocket. // HTTP POST and websocket.
func newRPCServer(listenAddrs []string) (*rpcServer, error) { func newRPCServer(listenAddrs []string, maxClients, maxWebsockets int64) (*rpcServer, error) {
login := cfg.Username + ":" + cfg.Password login := cfg.Username + ":" + cfg.Password
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
s := rpcServer{ s := rpcServer{
authsha: sha256.Sum256([]byte(auth)), authsha: sha256.Sum256([]byte(auth)),
wsClients: map[*websocketClient]struct{}{}, maxClients: maxClients,
maxWebsockets: maxWebsockets,
wsClients: map[*websocketClient]struct{}{},
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
// Allow all origins. // Allow all origins.
CheckOrigin: func(r *http.Request) bool { return true }, CheckOrigin: func(r *http.Request) bool { return true },
@ -303,6 +308,7 @@ func (s *rpcServer) Start() {
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
const rpcAuthTimeoutSeconds = 10 const rpcAuthTimeoutSeconds = 10
httpServer := &http.Server{ httpServer := &http.Server{
Handler: serveMux, Handler: serveMux,
@ -310,45 +316,50 @@ func (s *rpcServer) Start() {
// handshake within the allowed timeframe. // handshake within the allowed timeframe.
ReadTimeout: time.Second * rpcAuthTimeoutSeconds, ReadTimeout: time.Second * rpcAuthTimeoutSeconds,
} }
serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "close")
w.Header().Set("Content-Type", "application/json")
r.Close = true
// TODO: Limit number of active connections. serveMux.Handle("/",
throttledFn(s.maxClients, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "close")
w.Header().Set("Content-Type", "application/json")
r.Close = true
if err := s.checkAuthHeader(r); err != nil { if err := s.checkAuthHeader(r); err != nil {
log.Warnf("Unauthorized client connection attempt") log.Warnf("Unauthorized client connection attempt")
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
return return
} }
s.PostClientRPC(w, r) s.PostClientRPC(w, r)
}) }),
serveMux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { )
authenticated := false
switch s.checkAuthHeader(r) { serveMux.Handle("/ws",
case nil: throttledFn(s.maxWebsockets, func(w http.ResponseWriter, r *http.Request) {
authenticated = true authenticated := false
case ErrNoAuth: switch s.checkAuthHeader(r) {
// nothing case nil:
default: authenticated = true
// If auth was supplied but incorrect, rather than simply case ErrNoAuth:
// being missing, immediately terminate the connection. // nothing
log.Warnf("Disconnecting improperly authorized " + default:
"websocket client") // If auth was supplied but incorrect, rather than simply
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) // being missing, immediately terminate the connection.
return log.Warnf("Disconnecting improperly authorized " +
} "websocket client")
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
return
}
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warnf("Cannot websocket upgrade client %s: %v",
r.RemoteAddr, err)
return
}
wsc := newWebsocketClient(conn, authenticated, r.RemoteAddr)
s.WebsocketClientRPC(wsc)
}),
)
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warnf("Cannot websocket upgrade client %s: %v",
r.RemoteAddr, err)
return
}
wsc := newWebsocketClient(conn, authenticated, r.RemoteAddr)
s.WebsocketClientRPC(wsc)
})
for _, listener := range s.listeners { for _, listener := range s.listeners {
s.wg.Add(1) s.wg.Add(1)
go func(listener net.Listener) { go func(listener net.Listener) {
@ -428,6 +439,31 @@ func (s *rpcServer) checkAuthHeader(r *http.Request) error {
return nil return nil
} }
// throttledFn wraps an http.HandlerFunc with throttling of concurrent active
// clients by responding with an HTTP 429 when the threshold is crossed.
func throttledFn(threshold int64, f http.HandlerFunc) http.Handler {
return throttled(threshold, f)
}
// throttled wraps an http.Handler with throttling of concurrent active
// clients by responding with an HTTP 429 when the threshold is crossed.
func throttled(threshold int64, h http.Handler) http.Handler {
var active int64
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
current := atomic.AddInt64(&active, 1)
defer atomic.AddInt64(&active, -1)
if current-1 >= threshold {
log.Warnf("Reached threshold of %d concurrent active clients", threshold)
http.Error(w, "429 Too Many Requests", 429)
return
}
h.ServeHTTP(w, r)
})
}
func (s *rpcServer) WebsocketClientRead(wsc *websocketClient) { func (s *rpcServer) WebsocketClientRead(wsc *websocketClient) {
for { for {
_, request, err := wsc.conn.ReadMessage() _, request, err := wsc.conn.ReadMessage()
@ -746,6 +782,8 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) {
// Send initial unsolicited notifications. // Send initial unsolicited notifications.
// TODO: these should be requested by the client first. // TODO: these should be requested by the client first.
s.NotifyConnectionStatus(wsc) s.NotifyConnectionStatus(wsc)
<-wsc.quit
} }
// maxRequestSize specifies the maximum number of bytes in the request body // maxRequestSize specifies the maximum number of bytes in the request body

56
rpcserver_test.go Normal file
View file

@ -0,0 +1,56 @@
/*
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
package main
import (
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
)
func TestThrottle(t *testing.T) {
const threshold = 1
srv := httptest.NewServer(throttledFn(threshold,
func(w http.ResponseWriter, r *http.Request) {
time.Sleep(20 * time.Millisecond)
}),
)
codes := make(chan int, 2)
for i := 0; i < cap(codes); i++ {
go func() {
res, err := http.Get(srv.URL)
if err != nil {
t.Fatal(err)
}
codes <- res.StatusCode
}()
}
got := make(map[int]int, cap(codes))
for i := 0; i < cap(codes); i++ {
got[<-codes]++
}
want := map[int]int{200: 1, 429: 1}
if !reflect.DeepEqual(want, got) {
t.Fatalf("status codes: want: %v, got: %v", want, got)
}
}