From 160a8431716c918070c8efaffbad4591ae604471 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Thu, 10 Jul 2014 13:11:57 -0500 Subject: [PATCH] Allow websocket conns to be established after New. ok @davecgh --- infrastructure.go | 163 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 135 insertions(+), 28 deletions(-) diff --git a/infrastructure.go b/infrastructure.go index 8102fc46..95eae990 100644 --- a/infrastructure.go +++ b/infrastructure.go @@ -37,6 +37,12 @@ var ( ErrInvalidEndpoint = errors.New("the endpoint either does not support " + "websockets or does not exist") + // ErrClientNotConnected is an error to describe the condition where a + // websocket client has been created, but the connection was never + // established. This condition differs from ErrClientDisconnect, which + // represents an established connection that was lost. + ErrClientNotConnected = errors.New("the client was never connected") + // ErrClientDisconnect is an error to describe the condition where the // client has been disconnected from the RPC server. When the // DisableAutoReconnect option is not set, any outstanding futures @@ -49,6 +55,18 @@ var ( // down. Any outstanding futures when a client shutdown occurs will // return this error as will any new requests. ErrClientShutdown = errors.New("the client has been shutdown") + + // ErrNotWebsocketClient is an error to describe the condition of + // calling a Client method intended for a websocket client when the + // client has been configured to run in HTTP POST mode instead. + ErrNotWebsocketClient = errors.New("client is not configured for " + + "websockets") + + // ErrClientAlreadyConnected is an error to describe the condition where + // a new client connection cannot be established due to a websocket + // client having already connected to the RPC server. + ErrClientAlreadyConnected = errors.New("websocket client has already " + + "connected") ) const ( @@ -127,11 +145,12 @@ type Client struct { ntfnState *notificationState // Networking infrastructure. - sendChan chan []byte - sendPostChan chan *sendPostDetails - disconnect chan struct{} - shutdown chan struct{} - wg sync.WaitGroup + sendChan chan []byte + sendPostChan chan *sendPostDetails + connEstablished chan struct{} + disconnect chan struct{} + shutdown chan struct{} + wg sync.WaitGroup } // NextID returns the next id to be used when sending a JSON-RPC message. This @@ -792,6 +811,15 @@ func (c *Client) sendCmd(cmd btcjson.Cmd) chan *response { return responseChan } + // Check whether the websocket connection has never been established, + // in which case the handler goroutines are not running. + select { + case <-c.connEstablished: + default: + responseChan <- &response{err: ErrClientNotConnected} + return responseChan + } + err := c.addRequest(cmd.Id().(uint64), &jsonRequest{ cmd: cmd, responseChan: responseChan, @@ -813,12 +841,18 @@ func (c *Client) sendCmdAndWait(cmd btcjson.Cmd) (interface{}, error) { return receiveFuture(c.sendCmd(cmd)) } -// Disconnected returns whether or not the server is disconnected. +// Disconnected returns whether or not the server is disconnected. If a +// websocket client was created but never connected, this also returns false. func (c *Client) Disconnected() bool { c.mtx.Lock() defer c.mtx.Unlock() - return c.disconnected + select { + case <-c.connEstablished: + return c.disconnected + default: + return false + } } // doDisconnect disconnects the websocket associated with the client if it @@ -841,7 +875,9 @@ func (c *Client) doDisconnect() bool { log.Tracef("Disconnecting RPC client %s", c.config.Host) close(c.disconnect) - c.wsConn.Close() + if c.wsConn != nil { + c.wsConn.Close() + } c.disconnected = true return true } @@ -922,7 +958,7 @@ func (c *Client) Shutdown() { c.doDisconnect() } -// Start begins processing input and output messages. +// start begins processing input and output messages. func (c *Client) start() { log.Tracef("Starting RPC client %s", c.config.Host) @@ -998,6 +1034,12 @@ type ConnConfig struct { // try to reconnect to the server when it has been disconnected. DisableAutoReconnect bool + // DisableConnectOnNew specifies that a websocket client connection + // should not be tried when creating the client with New. Instead, the + // client is created and returned unconnected, and Connect must be + // called manually. + DisableConnectOnNew bool + // HttpPostMode instructs the client to run using multiple independent // connections issuing HTTP POST requests instead of using the default // of websockets. Websockets are generally preferred as some of the @@ -1119,8 +1161,11 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error // when running in HTTP POST mode. var wsConn *websocket.Conn var httpClient *http.Client + connEstablished := make(chan struct{}) + var start bool if config.HttpPostMode { ntfnHandlers = nil + start = true var err error httpClient, err = newHTTPClient(config) @@ -1128,34 +1173,96 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error return nil, err } } else { - var err error - wsConn, err = dial(config) - if err != nil { - return nil, err + if !config.DisableConnectOnNew { + var err error + wsConn, err = dial(config) + if err != nil { + return nil, err + } + start = true } } log.Infof("Established connection to RPC server %s", config.Host) client := &Client{ - config: config, - wsConn: wsConn, - httpClient: httpClient, - requestMap: make(map[uint64]*list.Element), - requestList: list.New(), - ntfnHandlers: ntfnHandlers, - ntfnState: newNotificationState(), - sendChan: make(chan []byte, sendBufferSize), - sendPostChan: make(chan *sendPostDetails, sendPostBufferSize), - disconnect: make(chan struct{}), - shutdown: make(chan struct{}), + config: config, + wsConn: wsConn, + httpClient: httpClient, + requestMap: make(map[uint64]*list.Element), + requestList: list.New(), + ntfnHandlers: ntfnHandlers, + ntfnState: newNotificationState(), + sendChan: make(chan []byte, sendBufferSize), + sendPostChan: make(chan *sendPostDetails, sendPostBufferSize), + connEstablished: connEstablished, + disconnect: make(chan struct{}), + shutdown: make(chan struct{}), } - client.start() - if !client.config.HttpPostMode && !client.config.DisableAutoReconnect { - client.wg.Add(1) - go client.wsReconnectHandler() + if start { + close(connEstablished) + client.start() + if !client.config.HttpPostMode && !client.config.DisableAutoReconnect { + client.wg.Add(1) + go client.wsReconnectHandler() + } } return client, nil } + +// Connect establishes the initial websocket connection. This is necessary when +// a client was created after setting the DisableConnectOnNew field of the +// Config struct. +// +// Up to tries number of connections (each after an increasing backoff) will +// be tried if the connection can not be established. The special value of 0 +// indicates an unlimited number of connection attempts. +// +// This method will error if the client is not configured for websockets, if the +// connection has already been established, or if none of the connection +// attempts were successful. +func (c *Client) Connect(tries int) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.config.HttpPostMode { + return ErrNotWebsocketClient + } + if c.wsConn != nil { + return ErrClientAlreadyConnected + } + + // Begin connection attempts. Increase the backoff after each failed + // attempt, up to a maximum of one minute. + var err error + var backoff time.Duration + for i := 0; tries == 0 || i < tries; i++ { + var wsConn *websocket.Conn + wsConn, err = dial(c.config) + if err != nil { + backoff = connectionRetryInterval * time.Duration(i+1) + if backoff > time.Minute { + backoff = time.Minute + } + time.Sleep(backoff) + continue + } + + // Connection was established. Set the websocket connection + // member of the client and start the goroutines necessary + // to run the client. + c.wsConn = wsConn + close(c.connEstablished) + c.start() + if !c.config.DisableAutoReconnect { + c.wg.Add(1) + go c.wsReconnectHandler() + } + return nil + } + + // All connection attempts failed, so return the last error. + return err +}