Add support for the authenticate extension request.

Just like btcd, this commit adds support for the authenticate request
allowing clients unable to set the HTTP Authorization header to
authenticate to use the RPC server.  The rules for the authenticate
request are as follows:

1) Authentication make clients stateful.  Clients may either be flagged
   as authenticated or unauthenticated.

2) Clients may authenticate by exactly one of two possible ways,
   either by setting the Authorization header or by sending a JSON-RPC
   authenticate request as follows:

   {
     "jsonrpc":"1.0",
     "id":0,
     "method":"authenticate",
     "params":["rpcuser", "rpcpass"]
   }

3) When not authenticated by the Authorization header, the first request
   must be an authenticate request.

4) Sending an authenticate request after a client has already
   successfully authenticated (either by the Authorization header or a
   previous authentication request) is invalid.

5) The result used in the response to a successful authenticate request
   is a JSON null.  For any unsuccessful or invalid authenticate
   requests, the connection is terminated.

This change also orders all incoming requests for a client.  This was
required to ensure that any authentication requests are processed
first.
This commit is contained in:
Josh Rickmar 2014-03-20 09:07:05 -05:00
parent c51cbb3332
commit 4f1d2e7121

View file

@ -43,6 +43,10 @@ import (
) )
var ( var (
// ErrBadAuth represents an error where a request is denied due to
// a missing, incorrect, or duplicate authentication request.
ErrBadAuth = errors.New("bad auth")
// ErrConnRefused represents an error where a connection to another // ErrConnRefused represents an error where a connection to another
// process cannot be established. // process cannot be established.
ErrConnRefused = errors.New("connection refused") ErrConnRefused = errors.New("connection refused")
@ -67,8 +71,8 @@ type server struct {
} }
type clientContext struct { type clientContext struct {
send chan []byte send chan []byte
disconnected chan struct{} // closed on disconnect quit chan struct{} // closed on disconnect
} }
// parseListeners splits the list of listen addresses passed in addrs into // parseListeners splits the list of listen addresses passed in addrs into
@ -214,20 +218,48 @@ func ParseRequest(msg []byte) (btcjson.Cmd, *btcjson.Error) {
// ReplyToFrontend responds to a marshaled JSON-RPC request with a // ReplyToFrontend responds to a marshaled JSON-RPC request with a
// marshaled JSON-RPC response for both standard and extension // marshaled JSON-RPC response for both standard and extension
// (websocket) clients. // (websocket) clients. The returned error is ErrBadAuth if a
func ReplyToFrontend(msg []byte, ws bool) []byte { // missing, incorrect, or duplicate authentication request is
// received.
func (s *server) ReplyToFrontend(msg []byte, ws, authenticated bool) ([]byte, error) {
cmd, jsonErr := ParseRequest(msg) cmd, jsonErr := ParseRequest(msg)
var id interface{} var id interface{}
if cmd != nil { if cmd != nil {
id = cmd.Id() id = cmd.Id()
} }
// If client is not already authenticated, the parsed request must
// be for authentication.
authCmd, ok := cmd.(*btcws.AuthenticateCmd)
if authenticated {
if ok {
// Duplicate auth request.
return nil, ErrBadAuth
}
} else {
if !ok {
// The first unauthenticated request must be an auth request.
return nil, ErrBadAuth
}
// Check credentials.
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := sha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], s.authsha[:])
if cmp != 1 {
return nil, ErrBadAuth
}
return nil, nil
}
if jsonErr != nil { if jsonErr != nil {
response := btcjson.Reply{ response := btcjson.Reply{
Id: &id, Id: &id,
Error: jsonErr, Error: jsonErr,
} }
mresponse, _ := json.Marshal(response) mresponse, _ := json.Marshal(response)
return mresponse return mresponse, nil
} }
cReq := NewClientRequest(cmd, ws) cReq := NewClientRequest(cmd, ws)
@ -248,7 +280,7 @@ func ReplyToFrontend(msg []byte, ws bool) []byte {
mresponse, _ = json.Marshal(&response) mresponse, _ = json.Marshal(&response)
} }
return mresponse return mresponse, nil
} }
// ServeRPCRequest processes and replies to a JSON-RPC client request. // ServeRPCRequest processes and replies to a JSON-RPC client request.
@ -258,7 +290,11 @@ func (s *server) ServeRPCRequest(w http.ResponseWriter, r *http.Request) {
log.Errorf("RPCS: Error getting JSON message: %v", err) log.Errorf("RPCS: Error getting JSON message: %v", err)
} }
resp := ReplyToFrontend(body, false) resp, err := s.ReplyToFrontend(body, false, true)
if err == ErrBadAuth {
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
return
}
if _, err := w.Write(resp); err != nil { if _, err := w.Write(resp); err != nil {
log.Warnf("RPCS: could not respond to RPC request: %v", err) log.Warnf("RPCS: could not respond to RPC request: %v", err)
} }
@ -277,7 +313,7 @@ func clientResponseDuplicator() {
case n := <-allClients: case n := <-allClients:
for cc := range clients { for cc := range clients {
select { select {
case <-cc.disconnected: case <-cc.quit:
delete(clients, cc) delete(clients, cc)
case cc.send <- n: case cc.send <- n:
} }
@ -297,76 +333,176 @@ func NotifyBtcdConnection(reply chan []byte) {
} }
// stringQueue manages a queue of strings, reading from in and sending
// the oldest unsent to out. This handler closes out and returns after
// in is closed and any queued items are sent. Any reads on quit result
// in immediate shutdown of the handler.
func stringQueue(in <-chan string, out chan<- string, quit <-chan struct{}) {
var q []string
var dequeue chan<- string
skipQueue := out
var next string
out:
for {
select {
case n, ok := <-in:
if !ok {
// Sender closed input channel. Nil channel
// and continue so the remaining queued
// items may be sent. If the queue is empty,
// break out of the loop.
in = nil
if dequeue == nil {
break out
}
continue
}
// Either send to out immediately if skipQueue is
// non-nil (queue is empty) and reader is ready,
// or append to the queue and send later.
select {
case skipQueue <- n:
default:
q = append(q, n)
dequeue = out
skipQueue = nil
next = q[0]
}
case dequeue <- next:
copy(q, q[1:])
q[len(q)-1] = "" // avoid leak
q = q[:len(q)-1]
if len(q) == 0 {
// If the input chan was closed and nil'd,
// break out of the loop.
if in == nil {
break out
}
dequeue = nil
skipQueue = out
} else {
next = q[0]
}
case <-quit:
break out
}
}
close(out)
}
// WSSendRecv is the handler for websocket client connections. It loops // WSSendRecv is the handler for websocket client connections. It loops
// forever (until disconnected), reading JSON-RPC requests and sending // forever (until disconnected), reading JSON-RPC requests and sending
// sending responses and notifications. // sending responses and notifications.
func WSSendRecv(ws *websocket.Conn) { func (s *server) WSSendRecv(ws *websocket.Conn, authenticated bool) {
// Add client context so notifications duplicated to each // Add client context so notifications duplicated to each
// client are received by this client. // client are received by this client.
recvQuit := make(chan struct{})
sendQuit := make(chan struct{})
cc := clientContext{ cc := clientContext{
send: make(chan []byte, 1), // buffer size is number of initial notifications send: make(chan []byte, 1), // buffer size is number of initial notifications
disconnected: make(chan struct{}), quit: make(chan struct{}),
} }
go func() {
select {
case <-recvQuit:
case <-sendQuit:
}
close(cc.quit)
}()
NotifyBtcdConnection(cc.send) // TODO(jrick): clients should explicitly request this. NotifyBtcdConnection(cc.send) // TODO(jrick): clients should explicitly request this.
addClient <- cc addClient <- cc
defer close(cc.disconnected)
// received passes all received messages from the currently connected // received passes all received messages from the currently connected
// frontend to the for-select loop. It is closed when reading a // frontend to the for-select loop. It is closed when reading a
// message from the websocket connection fails (presumably due to // message from the websocket connection fails (presumably due to
// a disconnected client). // a disconnected client).
received := make(chan []byte) recvQueueIn := make(chan string)
// Receive messages from websocket and send across jsonMsgs until // Receive messages from websocket and send across jsonMsgs until
// connection is lost // connection is lost
go func() { go func() {
for { for {
var m []byte var m string
if err := websocket.Message.Receive(ws, &m); err != nil { if err := websocket.Message.Receive(ws, &m); err != nil {
// Log warning if the client did not disconnect. select {
if err != io.EOF { case <-sendQuit:
log.Warnf("Cannot receive client websocket message: %v", // Do not log error.
err)
default:
// Log warning if the client did not disconnect.
if err != io.EOF {
log.Warnf("Cannot receive client websocket message: %v",
err)
}
} }
close(received) close(recvQueueIn)
close(recvQuit)
return return
} }
received <- m recvQueueIn <- m
} }
}() }()
// Manage queue of received messages for LIFO processing.
recvQueueOut := make(chan string)
go stringQueue(recvQueueIn, recvQueueOut, cc.quit)
badAuth := make(chan struct{})
go func() {
for m := range recvQueueOut {
resp, err := s.ReplyToFrontend([]byte(m), true, authenticated)
if err == ErrBadAuth {
select {
case badAuth <- struct{}{}:
case <-cc.quit:
}
return
}
// Authentication passed.
authenticated = true
select {
case cc.send <- resp:
case <-cc.quit:
}
}
close(cc.send)
}()
const deadline time.Duration = 2 * time.Second const deadline time.Duration = 2 * time.Second
out:
for { for {
select { select {
case m, ok := <-received: case <-badAuth:
// Bad auth. Disconnect.
log.Warnf("Disconnecting improperly authorized websocket client")
ws.Close()
break out
case m, ok := <-cc.send:
if !ok { if !ok {
// client disconnected. // Nothing left to send. Return so the handler exits.
return break out
} }
// Handle request here.
go func(m []byte) {
resp := ReplyToFrontend(m, true)
select {
case cc.send <- resp:
case <-cc.disconnected:
}
}(m)
case m := <-cc.send:
err := ws.SetWriteDeadline(time.Now().Add(deadline)) err := ws.SetWriteDeadline(time.Now().Add(deadline))
if err != nil { if err != nil {
log.Errorf("Cannot set write deadline: %v", err) log.Errorf("Cannot set write deadline: %v", err)
return break out
} }
err = websocket.Message.Send(ws, m) err = websocket.Message.Send(ws, string(m))
if err != nil { if err != nil {
log.Infof("Cannot complete client websocket send: %v", err) log.Infof("Cannot complete client websocket send: %v", err)
return break out
} }
} }
} }
close(sendQuit)
} }
// NotifyNewBlockChainHeight notifies all frontends of a new // NotifyNewBlockChainHeight notifies all frontends of a new
@ -396,17 +532,27 @@ func (s *server) Start() {
httpServer := &http.Server{Handler: serveMux} httpServer := &http.Server{Handler: serveMux}
serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if err := s.checkAuth(r); err != nil { if err := s.checkAuth(r); err != nil {
log.Warnf("Unauthorized client connection attempt")
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
return return
} }
s.ServeRPCRequest(w, r) s.ServeRPCRequest(w, r)
}) })
serveMux.HandleFunc("/frontend", func(w http.ResponseWriter, r *http.Request) { serveMux.HandleFunc("/frontend", func(w http.ResponseWriter, r *http.Request) {
if err := s.checkAuth(r); err != nil { authenticated := false
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) if err := s.checkAuth(r); err == nil {
return authenticated = true
} }
websocket.Handler(WSSendRecv).ServeHTTP(w, r)
// A new Server instance is created rather than just creating the
// handler closure since the default server will disconnect the
// client if the origin is unset.
wsServer := websocket.Server{
Handler: websocket.Handler(func(ws *websocket.Conn) {
s.WSSendRecv(ws, authenticated)
}),
}
wsServer.ServeHTTP(w, r)
}) })
for _, listener := range s.listeners { for _, listener := range s.listeners {
s.wg.Add(1) s.wg.Add(1)
@ -428,15 +574,13 @@ func (s *server) Start() {
func (s *server) checkAuth(r *http.Request) error { func (s *server) checkAuth(r *http.Request) error {
authhdr := r.Header["Authorization"] authhdr := r.Header["Authorization"]
if len(authhdr) <= 0 { if len(authhdr) <= 0 {
log.Infof("Frontend did not supply authentication.") return ErrBadAuth
return errors.New("auth failure")
} }
authsha := sha256.Sum256([]byte(authhdr[0])) authsha := sha256.Sum256([]byte(authhdr[0]))
cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:]) cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:])
if cmp != 1 { if cmp != 1 {
log.Infof("Frontend did not supply correct authentication.") return ErrBadAuth
return errors.New("auth failure")
} }
return nil return nil
} }