Use mtx to control disconnect of websocket client.

This commit changes the websocket client code to use a mutex for
disconnect since it's theoretically possible a non-blocking select on the
quit channel could fall through from two different goroutines thus causing
a second call to close.

ok @jrick.
This commit is contained in:
Dave Collins 2014-02-24 23:57:36 -06:00
parent 0c6d7bbeae
commit 8c7c1e84a3

View file

@ -692,12 +692,18 @@ func createMarshalledReply(id, result interface{}, replyErr error) ([]byte, erro
// ensure sending notifications from other subsystems can't block. Ultimately, // ensure sending notifications from other subsystems can't block. Ultimately,
// all messages are sent via the outHandler. // all messages are sent via the outHandler.
type wsClient struct { type wsClient struct {
sync.Mutex
// server is the RPC server that is servicing the client. // server is the RPC server that is servicing the client.
server *rpcServer server *rpcServer
// conn is the underlying websocket connection. // conn is the underlying websocket connection.
conn *websocket.Conn conn *websocket.Conn
// disconnected indicated whether or not the websocket client is
// disconnected.
disconnected bool
// addr is the remote address of the client. // addr is the remote address of the client.
addr string addr string
@ -1096,14 +1102,12 @@ cleanup:
// the number of outstanding requests a client can make without preventing or // the number of outstanding requests a client can make without preventing or
// blocking on async notifications. // blocking on async notifications.
func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) { func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
// Don't queue the message if in the process of shutting down. // Don't send the message if disconnected.
select { if c.Disconnected() {
case <-c.quit:
if doneChan != nil { if doneChan != nil {
doneChan <- false doneChan <- false
} }
return return
default:
} }
c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan} c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan}
@ -1123,29 +1127,37 @@ var ErrClientQuit = errors.New("client quit")
// ErrClientQuit. This is intended to be checked by long-running notification // ErrClientQuit. This is intended to be checked by long-running notification
// handlers to stop processing if there is no more work needed to be done. // handlers to stop processing if there is no more work needed to be done.
func (c *wsClient) QueueNotification(marshalledJSON []byte) error { func (c *wsClient) QueueNotification(marshalledJSON []byte) error {
// Don't queue the message if in the process of shutting down. // Don't queue the message if disconnected.
select { if c.Disconnected() {
case <-c.quit:
return ErrClientQuit return ErrClientQuit
default:
} }
c.ntfnChan <- marshalledJSON c.ntfnChan <- marshalledJSON
return nil return nil
} }
// Disconnected returns whether or not the websocket client is disconnected.
func (c *wsClient) Disconnected() bool {
c.Lock()
defer c.Unlock()
return c.disconnected
}
// Disconnect disconnects the websocket client. // Disconnect disconnects the websocket client.
func (c *wsClient) Disconnect() { func (c *wsClient) Disconnect() {
// Don't try to disconnect again if in the process of shutting down. c.Lock()
select { defer c.Unlock()
case <-c.quit:
// Nothing to do if already disconnected.
if c.disconnected {
return return
default:
} }
rpcsLog.Tracef("Disconnecting websocket client %s", c.addr) rpcsLog.Tracef("Disconnecting websocket client %s", c.addr)
close(c.quit) close(c.quit)
c.conn.Close() c.conn.Close()
c.disconnected = true
} }
// Start begins processing input and output messages. // Start begins processing input and output messages.