Allow websocket conns to be established after New.

ok @davecgh
This commit is contained in:
Josh Rickmar 2014-07-10 13:11:57 -05:00
parent 6b8ff7f52f
commit 160a843171

View file

@ -37,6 +37,12 @@ var (
ErrInvalidEndpoint = errors.New("the endpoint either does not support " +
"websockets or does not exist")
// ErrClientNotConnected is an error to describe the condition where a
// websocket client has been created, but the connection was never
// established. This condition differs from ErrClientDisconnect, which
// represents an established connection that was lost.
ErrClientNotConnected = errors.New("the client was never connected")
// ErrClientDisconnect is an error to describe the condition where the
// client has been disconnected from the RPC server. When the
// DisableAutoReconnect option is not set, any outstanding futures
@ -49,6 +55,18 @@ var (
// down. Any outstanding futures when a client shutdown occurs will
// return this error as will any new requests.
ErrClientShutdown = errors.New("the client has been shutdown")
// ErrNotWebsocketClient is an error to describe the condition of
// calling a Client method intended for a websocket client when the
// client has been configured to run in HTTP POST mode instead.
ErrNotWebsocketClient = errors.New("client is not configured for " +
"websockets")
// ErrClientAlreadyConnected is an error to describe the condition where
// a new client connection cannot be established due to a websocket
// client having already connected to the RPC server.
ErrClientAlreadyConnected = errors.New("websocket client has already " +
"connected")
)
const (
@ -127,11 +145,12 @@ type Client struct {
ntfnState *notificationState
// Networking infrastructure.
sendChan chan []byte
sendPostChan chan *sendPostDetails
disconnect chan struct{}
shutdown chan struct{}
wg sync.WaitGroup
sendChan chan []byte
sendPostChan chan *sendPostDetails
connEstablished chan struct{}
disconnect chan struct{}
shutdown chan struct{}
wg sync.WaitGroup
}
// NextID returns the next id to be used when sending a JSON-RPC message. This
@ -792,6 +811,15 @@ func (c *Client) sendCmd(cmd btcjson.Cmd) chan *response {
return responseChan
}
// Check whether the websocket connection has never been established,
// in which case the handler goroutines are not running.
select {
case <-c.connEstablished:
default:
responseChan <- &response{err: ErrClientNotConnected}
return responseChan
}
err := c.addRequest(cmd.Id().(uint64), &jsonRequest{
cmd: cmd,
responseChan: responseChan,
@ -813,12 +841,18 @@ func (c *Client) sendCmdAndWait(cmd btcjson.Cmd) (interface{}, error) {
return receiveFuture(c.sendCmd(cmd))
}
// Disconnected returns whether or not the server is disconnected.
// Disconnected returns whether or not the server is disconnected. If a
// websocket client was created but never connected, this also returns false.
func (c *Client) Disconnected() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.disconnected
select {
case <-c.connEstablished:
return c.disconnected
default:
return false
}
}
// doDisconnect disconnects the websocket associated with the client if it
@ -841,7 +875,9 @@ func (c *Client) doDisconnect() bool {
log.Tracef("Disconnecting RPC client %s", c.config.Host)
close(c.disconnect)
c.wsConn.Close()
if c.wsConn != nil {
c.wsConn.Close()
}
c.disconnected = true
return true
}
@ -922,7 +958,7 @@ func (c *Client) Shutdown() {
c.doDisconnect()
}
// Start begins processing input and output messages.
// start begins processing input and output messages.
func (c *Client) start() {
log.Tracef("Starting RPC client %s", c.config.Host)
@ -998,6 +1034,12 @@ type ConnConfig struct {
// try to reconnect to the server when it has been disconnected.
DisableAutoReconnect bool
// DisableConnectOnNew specifies that a websocket client connection
// should not be tried when creating the client with New. Instead, the
// client is created and returned unconnected, and Connect must be
// called manually.
DisableConnectOnNew bool
// HttpPostMode instructs the client to run using multiple independent
// connections issuing HTTP POST requests instead of using the default
// of websockets. Websockets are generally preferred as some of the
@ -1119,8 +1161,11 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
// when running in HTTP POST mode.
var wsConn *websocket.Conn
var httpClient *http.Client
connEstablished := make(chan struct{})
var start bool
if config.HttpPostMode {
ntfnHandlers = nil
start = true
var err error
httpClient, err = newHTTPClient(config)
@ -1128,34 +1173,96 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
return nil, err
}
} else {
var err error
wsConn, err = dial(config)
if err != nil {
return nil, err
if !config.DisableConnectOnNew {
var err error
wsConn, err = dial(config)
if err != nil {
return nil, err
}
start = true
}
}
log.Infof("Established connection to RPC server %s",
config.Host)
client := &Client{
config: config,
wsConn: wsConn,
httpClient: httpClient,
requestMap: make(map[uint64]*list.Element),
requestList: list.New(),
ntfnHandlers: ntfnHandlers,
ntfnState: newNotificationState(),
sendChan: make(chan []byte, sendBufferSize),
sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
disconnect: make(chan struct{}),
shutdown: make(chan struct{}),
config: config,
wsConn: wsConn,
httpClient: httpClient,
requestMap: make(map[uint64]*list.Element),
requestList: list.New(),
ntfnHandlers: ntfnHandlers,
ntfnState: newNotificationState(),
sendChan: make(chan []byte, sendBufferSize),
sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
connEstablished: connEstablished,
disconnect: make(chan struct{}),
shutdown: make(chan struct{}),
}
client.start()
if !client.config.HttpPostMode && !client.config.DisableAutoReconnect {
client.wg.Add(1)
go client.wsReconnectHandler()
if start {
close(connEstablished)
client.start()
if !client.config.HttpPostMode && !client.config.DisableAutoReconnect {
client.wg.Add(1)
go client.wsReconnectHandler()
}
}
return client, nil
}
// Connect establishes the initial websocket connection. This is necessary when
// a client was created after setting the DisableConnectOnNew field of the
// Config struct.
//
// Up to tries number of connections (each after an increasing backoff) will
// be tried if the connection can not be established. The special value of 0
// indicates an unlimited number of connection attempts.
//
// This method will error if the client is not configured for websockets, if the
// connection has already been established, or if none of the connection
// attempts were successful.
func (c *Client) Connect(tries int) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.config.HttpPostMode {
return ErrNotWebsocketClient
}
if c.wsConn != nil {
return ErrClientAlreadyConnected
}
// Begin connection attempts. Increase the backoff after each failed
// attempt, up to a maximum of one minute.
var err error
var backoff time.Duration
for i := 0; tries == 0 || i < tries; i++ {
var wsConn *websocket.Conn
wsConn, err = dial(c.config)
if err != nil {
backoff = connectionRetryInterval * time.Duration(i+1)
if backoff > time.Minute {
backoff = time.Minute
}
time.Sleep(backoff)
continue
}
// Connection was established. Set the websocket connection
// member of the client and start the goroutines necessary
// to run the client.
c.wsConn = wsConn
close(c.connEstablished)
c.start()
if !c.config.DisableAutoReconnect {
c.wg.Add(1)
go c.wsReconnectHandler()
}
return nil
}
// All connection attempts failed, so return the last error.
return err
}