diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 72552892..63874c1b 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -86,15 +86,11 @@ const ( // connectionRetryInterval is the amount of time to wait in between // retries when automatically reconnecting to an RPC server. connectionRetryInterval = time.Second * 5 -) -// sendPostDetails houses an HTTP POST request to send to an RPC server as well -// as the original JSON-RPC command and a channel to reply on when the server -// responds with the result. -type sendPostDetails struct { - httpRequest *http.Request - jsonRequest *jsonRequest -} + // requestRetryInterval is the initial amount of time to wait in between + // retries when sending HTTP POST requests. + requestRetryInterval = time.Millisecond * 500 +) // jsonRequest holds information about a json request that is used to properly // detect, interpret, and deliver a reply to it. @@ -183,7 +179,7 @@ type Client struct { // Networking infrastructure. sendChan chan []byte - sendPostChan chan *sendPostDetails + sendPostChan chan *jsonRequest connEstablished chan struct{} disconnect chan struct{} shutdown chan struct{} @@ -765,10 +761,50 @@ out: // handleSendPostMessage handles performing the passed HTTP request, reading the // result, unmarshalling it, and delivering the unmarshalled result to the // provided response channel. -func (c *Client) handleSendPostMessage(details *sendPostDetails) { - jReq := details.jsonRequest - log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) - httpResponse, err := c.httpClient.Do(details.httpRequest) +func (c *Client) handleSendPostMessage(jReq *jsonRequest) { + protocol := "http" + if !c.config.DisableTLS { + protocol = "https" + } + url := protocol + "://" + c.config.Host + + var err error + var backoff time.Duration + var httpResponse *http.Response + tries := 10 + for i := 0; tries == 0 || i < tries; i++ { + bodyReader := bytes.NewReader(jReq.marshalledJSON) + httpReq, err := http.NewRequest("POST", url, bodyReader) + if err != nil { + jReq.responseChan <- &Response{result: nil, err: err} + return + } + httpReq.Close = true + httpReq.Header.Set("Content-Type", "application/json") + for key, value := range c.config.ExtraHeaders { + httpReq.Header.Set(key, value) + } + + // Configure basic access authorization. + user, pass, err := c.config.getAuth() + if err != nil { + jReq.responseChan <- &Response{result: nil, err: err} + return + } + httpReq.SetBasicAuth(user, pass) + + httpResponse, err = c.httpClient.Do(httpReq) + if err != nil { + backoff = requestRetryInterval * time.Duration(i+1) + if backoff > time.Minute { + backoff = time.Minute + } + log.Debugf("Failed command [%s] with id %d attempt %d. Retrying in %v... \n", jReq.method, jReq.id, i, backoff) + time.Sleep(backoff) + continue + } + break + } if err != nil { jReq.responseChan <- &Response{err: err} return @@ -821,8 +857,8 @@ out: // Send any messages ready for send until the shutdown channel // is closed. select { - case details := <-c.sendPostChan: - c.handleSendPostMessage(details) + case jReq := <-c.sendPostChan: + c.handleSendPostMessage(jReq) case <-c.shutdown: break out @@ -834,8 +870,8 @@ out: cleanup: for { select { - case details := <-c.sendPostChan: - details.jsonRequest.responseChan <- &Response{ + case jReq := <-c.sendPostChan: + jReq.responseChan <- &Response{ result: nil, err: ErrClientShutdown, } @@ -852,7 +888,7 @@ cleanup: // sendPostRequest sends the passed HTTP request to the RPC server using the // HTTP client associated with the client. It is backed by a buffered channel, // so it will not block until the send channel is full. -func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) { +func (c *Client) sendPostRequest(jReq *jsonRequest) { // Don't send the message if shutting down. select { case <-c.shutdown: @@ -860,10 +896,9 @@ func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) { default: } - c.sendPostChan <- &sendPostDetails{ - jsonRequest: jReq, - httpRequest: httpReq, - } + log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) + + c.sendPostChan <- jReq } // newFutureError returns a new future result channel that already has the @@ -885,42 +920,6 @@ func ReceiveFuture(f chan *Response) ([]byte, error) { return r.result, r.err } -// sendPost sends the passed request to the server by issuing an HTTP POST -// request using the provided response channel for the reply. Typically a new -// connection is opened and closed for each command when using this method, -// however, the underlying HTTP client might coalesce multiple commands -// depending on several factors including the remote server configuration. -func (c *Client) sendPost(jReq *jsonRequest) { - // Generate a request to the configured RPC server. - protocol := "http" - if !c.config.DisableTLS { - protocol = "https" - } - url := protocol + "://" + c.config.Host - bodyReader := bytes.NewReader(jReq.marshalledJSON) - httpReq, err := http.NewRequest("POST", url, bodyReader) - if err != nil { - jReq.responseChan <- &Response{result: nil, err: err} - return - } - httpReq.Close = true - httpReq.Header.Set("Content-Type", "application/json") - for key, value := range c.config.ExtraHeaders { - httpReq.Header.Set(key, value) - } - - // Configure basic access authorization. - user, pass, err := c.config.getAuth() - if err != nil { - jReq.responseChan <- &Response{result: nil, err: err} - return - } - httpReq.SetBasicAuth(user, pass) - - log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) - c.sendPostRequest(httpReq, jReq) -} - // sendRequest sends the passed json request to the associated server using the // provided response channel for the reply. It handles both websocket and HTTP // POST mode depending on the configuration of the client. @@ -935,7 +934,7 @@ func (c *Client) sendRequest(jReq *jsonRequest) { log.Warn(err) } } else { - c.sendPost(jReq) + c.sendPostRequest(jReq) } return } @@ -1428,7 +1427,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error ntfnHandlers: ntfnHandlers, ntfnState: newNotificationState(), sendChan: make(chan []byte, sendBufferSize), - sendPostChan: make(chan *sendPostDetails, sendPostBufferSize), + sendPostChan: make(chan *jsonRequest, sendPostBufferSize), connEstablished: connEstablished, disconnect: make(chan struct{}), shutdown: make(chan struct{}), @@ -1642,7 +1641,7 @@ func (c *Client) sendAsync() FutureGetBulkResult { marshalledJSON: marshalledRequest, responseChan: responseChan, } - c.sendPost(&request) + c.sendPostRequest(&request) return responseChan }