// Copyright (c) 2014 Conformal Systems LLC. // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package btcrpcclient import ( "bytes" "code.google.com/p/go.net/websocket" "container/list" "crypto/tls" "crypto/x509" "encoding/base64" "encoding/json" "errors" "fmt" "github.com/conformal/btcjson" "github.com/conformal/go-socks" "net" "net/http" "net/url" "sync" "sync/atomic" "time" ) var ( // ErrInvalidAuth is an error to describe the condition where the client // is either unable to authenticate or the specified endpoint is // incorrect. ErrInvalidAuth = errors.New("authentication failure") // ErrClientDisconnect is an error to describe the condition where the // client has been disconnected from the RPC server. When the // DisableAutoReconnect option is not set, any outstanding futures // when a client disconnect occurs will return this error as will // any new requests. ErrClientDisconnect = errors.New("the client has been disconnected") // ErrClientShutdown is an error to describe the condition where the // client is either already shutdown, or in the process of shutting // down. Any outstanding futures when a client shutdown occurs will // return this error as will any new requests. ErrClientShutdown = errors.New("the client has been shutdown") ) const ( // sendBufferSize is the number of elements the websocket send channel // can queue before blocking. sendBufferSize = 50 // sendPostBufferSize is the number of elements the HTTP POST send // channel can queue before blocking. sendPostBufferSize = 100 // connectionRetryInterval is the amount of time to wait in between // retries when automatically reconnecting to an RPC server. connectionRetryInterval = time.Second * 5 ) // futureResult holds information about a future promise to deliver the result // of an asynchronous request. type futureResult struct { reply *btcjson.Reply err error } // sendPostDetails houses an HTTP POST request to send to an RPC server as well // as the original JSON-RPC command and a channel to reply on when the server // responds with the result. type sendPostDetails struct { command btcjson.Cmd request *http.Request responseChan chan *futureResult } // jsonRequest holds information about a json request that is used to properly // detect, interpret, and deliver a reply to it. type jsonRequest struct { cmd btcjson.Cmd responseChan chan *futureResult } // Client represents a Bitcoin RPC client which allows easy access to the // various RPC methods available on a Bitcoin RPC server. Each of the wrapper // functions handle the details of converting the passed and return types to and // from the underlying JSON types which are required for the JSON-RPC // invocations // // The client provides each RPC in both synchronous (blocking) and asynchronous // (non-blocking) forms. The asynchronous forms are based on the concept of // futures where they return an instance of a type that promises to deliver the // result of the invocation at some future time. Invoking the Receive method on // the returned future will block until the result is available if it's not // already. type Client struct { id int64 // atomic, so must stay 64-bit aligned // config holds the connection configuration assoiated with this client. config *ConnConfig // wsConn is the underlying websocket connection when not in HTTP POST // mode. wsConn *websocket.Conn // httpClient is the underlying HTTP client to use when running in HTTP // POST mode. httpClient *http.Client // mtx is a mutex to protect access to connection related fields. mtx sync.Mutex // disconnected indicated whether or not the server is disconnected. disconnected bool // retryCount holds the number of times the client has tried to // reconnect to the RPC server. retryCount int64 // Track command and their response channels by ID. requestLock sync.Mutex requestMap map[int64]*list.Element requestList *list.List // Notification handlers. ntfnHandlers *NotificationHandlers // Networking infrastructure. sendChan chan []byte sendPostChan chan *sendPostDetails disconnect chan struct{} shutdown chan struct{} wg sync.WaitGroup } // NextID returns the next id to be used when sending a JSON-RPC message. This // ID allows responses to be associated with particular requests per the // JSON-RPC specification. Typically the consumer of the client does not need // to call this function, however, if a custom request is being created and used // this function should be used to ensure the ID is unique amongst all requests // being made. func (c *Client) NextID() int64 { return atomic.AddInt64(&c.id, 1) } // addRequest associates the passed jsonRequest with the passed id. This allows // the response from the remote server to be unmarshalled to the appropriate // type and sent to the specified channel when it is received. // // This function is safe for concurrent access. func (c *Client) addRequest(id int64, request *jsonRequest) { c.requestLock.Lock() defer c.requestLock.Unlock() // TODO(davec): Already there? element := c.requestList.PushBack(request) c.requestMap[id] = element } // removeRequest returns and removes the jsonRequest which contains the response // channel and original method associated with the passed id or nil if there is // no association. // // This function is safe for concurrent access. func (c *Client) removeRequest(id int64) *jsonRequest { c.requestLock.Lock() defer c.requestLock.Unlock() element := c.requestMap[id] if element != nil { delete(c.requestMap, id) request := c.requestList.Remove(element).(*jsonRequest) return request } return nil } // removeAllRequests removes all the jsonRequests which contain the response // channels for outstanding requests. // // This function is safe for concurrent access. func (c *Client) removeAllRequests() { c.requestLock.Lock() defer c.requestLock.Unlock() c.requestMap = make(map[int64]*list.Element) c.requestList.Init() } // handleMessage is the main handler for incoming requests. It enforces // authentication, parses the incoming json, looks up and executes handlers // (including pass through for standard RPC commands), sends the appropriate // response. It also detects commands which are marked as long-running and // sends them off to the asyncHander for processing. func (c *Client) handleMessage(msg string) { // Attempt to unmarshal the message as a known JSON-RPC command. if cmd, err := btcjson.ParseMarshaledCmd([]byte(msg)); err == nil { // Commands that have an ID associated with them are not // notifications. Since this is a client, it should not // be receiving non-notifications. if cmd.Id() != nil { // Invalid response log.Warnf("Remote server sent a non-notification "+ "JSON-RPC Request (Id: %v)", cmd.Id()) return } // Deliver the notification. log.Tracef("Received notification [%s]", cmd.Method()) c.handleNotification(cmd) return } // The message was not a command/notification, so it should be a reply // to a previous request. var r btcjson.Reply if err := json.Unmarshal([]byte(msg), &r); err != nil { log.Warnf("Unable to unmarshal inbound message as " + "notification or response") return } // Ensure the reply has an id. if r.Id == nil { log.Warnf("Received response with no id") return } // Ensure the id is the expected type. fid, ok := (*r.Id).(float64) if !ok { log.Warnf("Received unexpected id type: %T (value %v)", *r.Id, *r.Id) return } id := int64(fid) log.Tracef("Received response for id %d (result %v)", id, r.Result) request := c.removeRequest(id) // Nothing more to do if there is no request associated with this reply. if request == nil || request.responseChan == nil { log.Warnf("Received unexpected reply: %s (id %d)", r.Result, id) return } // Unmarshal the reply into a concrete result if possible and deliver // it to the associated channel. reply, err := btcjson.ReadResultCmd(request.cmd.Method(), []byte(msg)) if err != nil { log.Warnf("Failed to unmarshal reply to command [%s] "+ "(id %d): %v", request.cmd.Method(), id, err) request.responseChan <- &futureResult{reply: nil, err: err} return } request.responseChan <- &futureResult{reply: &reply, err: nil} } // wsInHandler handles all incoming messages for the websocket connection // associated with the client. It must be run as a goroutine. func (c *Client) wsInHandler() { out: for { // Break out of the loop once the shutdown channel has been // closed. Use a non-blocking select here so we fall through // otherwise. select { case <-c.shutdown: break out default: } var msg string if err := websocket.Message.Receive(c.wsConn, &msg); err != nil { // Log the error if it's not due to disconnecting. if _, ok := err.(*net.OpError); !ok { log.Errorf("Websocket receive error from "+ "%s: %v", c.config.Host, err) } break out } c.handleMessage(msg) } // Ensure the connection is closed. c.Disconnect() c.wg.Done() log.Tracef("RPC client input handler done for %s", c.config.Host) } // wsOutHandler handles all outgoing messages for the websocket connection. It // uses a buffered channel to serialize output messages while allowing the // sender to continue running asynchronously. It must be run as a goroutine. func (c *Client) wsOutHandler() { out: for { // Send any messages ready for send until the client is // disconnected closed. select { case msg := <-c.sendChan: err := websocket.Message.Send(c.wsConn, string(msg)) if err != nil { c.Disconnect() break out } case <-c.disconnect: break out } } // Drain any channels before exiting so nothing is left waiting around // to send. cleanup: for { select { case <-c.sendChan: default: break cleanup } } c.wg.Done() log.Tracef("RPC client output handler done for %s", c.config.Host) } // sendMessage sends the passed JSON to the connected server using the // websocket connection. It is backed by a buffered channel, so it will not // block until the send channel is full. func (c *Client) sendMessage(marshalledJSON []byte) { // Don't send the message if disconnected. if c.Disconnected() { return } c.sendChan <- marshalledJSON } // resendCmds resends any commands that had not completed when the client // disconnected. It is intended to be called once the client has reconnected. func (c *Client) resendCmds() { // Since it's possible to block on send and more commands might be // added by the caller while resending, make a copy of all of the // commands that need to be resent now and work from the copy. This // also allows the lock to be released quickly. c.requestLock.Lock() resendCmds := make([]*jsonRequest, 0, c.requestList.Len()) for e := c.requestList.Front(); e != nil; e = e.Next() { req := e.Value.(*jsonRequest) resendCmds = append(resendCmds, req) } c.requestLock.Unlock() for _, req := range resendCmds { // Stop resending commands if the client disconnected again // since the next reconnect will handle them. if c.Disconnected() { return } c.marshalAndSend(req.cmd, req.responseChan) time.Sleep(time.Second * 2) } } // wsReconnectHandler listens for client disconnects and automatically tries // to reconnect with retry interval that scales based on the number of retries. // It also resends any commands that had not completed when the client // disconnected so the disconnect/reconnect process is largely transparent to // the caller. This function is not run when the DisableAutoReconnect config // options is set. // // This function must be run as a goroutine. func (c *Client) wsReconnectHandler() { out: for { select { case <-c.disconnect: // On disconnect, fallthrough to reestablish the // connection. case <-c.shutdown: break out } reconnect: for { select { case <-c.shutdown: break out default: } wsConn, err := dial(c.config) if err != nil { c.retryCount++ log.Infof("Failed to connect to %s: %v", c.config.Host, err) // Scale the retry interval by the number of // retries so there is a backoff up to a max // of 1 minute. scaledInterval := connectionRetryInterval.Nanoseconds() * c.retryCount scaledDuration := time.Duration(scaledInterval) if scaledDuration > time.Minute { scaledDuration = time.Minute } log.Infof("Retrying connection to %s in "+ "%s", c.config.Host, scaledDuration) time.Sleep(scaledDuration) continue reconnect } log.Infof("Reestablished connection to RPC server %s", c.config.Host) // Reset the connection state and signal the reconnect // has happened. c.wsConn = wsConn c.retryCount = 0 c.disconnect = make(chan struct{}) c.mtx.Lock() c.disconnected = false c.mtx.Unlock() // Start processing input and output for the // new connection. c.start() // Reissue pending commands in another goroutine since // the send can block. go c.resendCmds() // Break out of the reconnect loop back to wait for // disconnect again. break reconnect } } c.wg.Done() log.Tracef("RPC client reconnect handler done for %s", c.config.Host) } // handleSendPostMessage handles performing the passed HTTP request, reading the // result, unmarshalling it, and delivering the unmarhsalled result to the // provided response channel. func (c *Client) handleSendPostMessage(details *sendPostDetails) { // Post the request. cmd := details.command log.Tracef("Sending command [%s] with id %d", cmd.Method(), cmd.Id()) httpResponse, err := c.httpClient.Do(details.request) if err != nil { details.responseChan <- &futureResult{reply: nil, err: err} return } // Read the raw bytes and close the response. respBytes, err := btcjson.GetRaw(httpResponse.Body) if err != nil { details.responseChan <- &futureResult{reply: nil, err: err} return } // Unmarshal the reply into a concrete result if possible. reply, err := btcjson.ReadResultCmd(cmd.Method(), respBytes) if err != nil { details.responseChan <- &futureResult{reply: nil, err: err} return } details.responseChan <- &futureResult{reply: &reply, err: nil} } // sendPostHandler handles all outgoing messages when the client is running // in HTTP POST mode. It uses a buffered channel to serialize output messages // while allowing the sender to continue running asynchronously. It must be run // as a goroutine. func (c *Client) sendPostHandler() { out: for { // Send any messages ready for send until the shutdown channel // is closed. select { case details := <-c.sendPostChan: c.handleSendPostMessage(details) case <-c.shutdown: break out } } // Drain any wait channels before exiting so nothing is left waiting // around to send. cleanup: for { select { case details := <-c.sendPostChan: details.responseChan <- &futureResult{ reply: nil, err: ErrClientShutdown, } default: break cleanup } } c.wg.Done() log.Tracef("RPC client send handler done for %s", c.config.Host) } // sendPostRequest sends the passed HTTP request to the RPC server using the // HTTP client associated with the client. It is backed by a buffered channel, // so it will not block until the send channel is full. func (c *Client) sendPostRequest(req *http.Request, command btcjson.Cmd, responseChan chan *futureResult) { // Don't send the message if shutting down. select { case <-c.shutdown: responseChan <- &futureResult{reply: nil, err: ErrClientShutdown} default: } c.sendPostChan <- &sendPostDetails{ request: req, command: command, responseChan: responseChan, } } // Disconnected returns whether or not the server is disconnected. func (c *Client) Disconnected() bool { c.mtx.Lock() defer c.mtx.Unlock() return c.disconnected } // newFutureError returns a new future result channel that already has the // passed error waitin on the channel with the reply set to nil. This is useful // to easily return errors from the various Async functions. func newFutureError(err error) chan *futureResult { responseChan := make(chan *futureResult, 1) responseChan <- &futureResult{err: err} return responseChan } // receiveFuture receives from the passed futureResult channel to extract a // reply or any errors. The examined errors include an error in the // futureResult and the error in the reply from the server. This will block // until the result is available on the passed channel. func receiveFuture(responseChan chan *futureResult) (interface{}, error) { // Wait for a response on the returned channel. response := <-responseChan if response.err != nil { return nil, response.err } // At this point, the command was either sent to the server and // there is a response from it, or it is intentionally a nil result // used to bybass sends for cases such a requesting notifications when // there are no handlers. reply := response.reply if reply == nil { return nil, nil } if reply.Error != nil { return nil, reply.Error } return reply.Result, nil } // marshalAndSendPost marshals the passed command to JSON-RPC and sends it to // the server by issuing an HTTP POST request and returns a response channel // on which the reply will be delivered. Typically a new connection is opened // and closed for each command when using this method, however, the underlying // HTTP client might coalesce multiple commands depending on several factors // including the remote server configuration. func (c *Client) marshalAndSendPost(cmd btcjson.Cmd, responseChan chan *futureResult) { marshalledJSON, err := json.Marshal(cmd) if err != nil { responseChan <- &futureResult{reply: nil, err: err} return } // Generate a request to the configured RPC server. protocol := "http" if !c.config.DisableTLS { protocol = "https" } url := protocol + "://" + c.config.Host req, err := http.NewRequest("POST", url, bytes.NewReader(marshalledJSON)) if err != nil { responseChan <- &futureResult{reply: nil, err: err} return } req.Close = true req.Header.Set("Content-Type", "application/json") // Configure basic access authorization. req.SetBasicAuth(c.config.User, c.config.Pass) log.Tracef("Sending command [%s] with id %d", cmd.Method(), cmd.Id()) c.sendPostRequest(req, cmd, responseChan) } // marshalAndSend marshals the passed command to JSON-RPC and sends it to the // server. It returns a response channel on which the reply will be delivered. func (c *Client) marshalAndSend(cmd btcjson.Cmd, responseChan chan *futureResult) { marshalledJSON, err := json.Marshal(cmd) if err != nil { responseChan <- &futureResult{reply: nil, err: err} return } log.Tracef("Sending command [%s] with id %d", cmd.Method(), cmd.Id()) c.sendMessage(marshalledJSON) } // sendCmd sends the passed command to the associated server and returns a // response channel on which the reply will be deliver at some point in the // future. It handles both websocket and HTTP POST mode depending on the // configuration of the client. func (c *Client) sendCmd(cmd btcjson.Cmd) chan *futureResult { // Choose which marshal and send function to use depending on whether // the client running in HTTP POST mode or not. When running in HTTP // POST mode, the command is issued via an HTTP client. Otherwise, // the command is issued via the asynchronous websocket channels. responseChan := make(chan *futureResult, 1) if c.config.HttpPostMode { c.marshalAndSendPost(cmd, responseChan) return responseChan } c.addRequest(cmd.Id().(int64), &jsonRequest{ cmd: cmd, responseChan: responseChan, }) c.marshalAndSend(cmd, responseChan) return responseChan } // sendCmdAndWait sends the passed command to the associated server, waits // for the reply, and returns the result from it. It will return the error // field in the reply if there is one. func (c *Client) sendCmdAndWait(cmd btcjson.Cmd) (interface{}, error) { // Marshal the command to JSON-RPC, send it to the connected server, and // wait for a response on the returned channel. return receiveFuture(c.sendCmd(cmd)) } // Disconnect disconnects the current websocket associated with the client. The // connection will automatically be re-established // // This function has no effect when the client is running in HTTP POST mode. func (c *Client) Disconnect() { if c.config.HttpPostMode { return } c.mtx.Lock() defer c.mtx.Unlock() // Nothing to do if already disconnected. if c.disconnected { return } log.Tracef("Disconnecting RPC client %s", c.config.Host) close(c.disconnect) c.wsConn.Close() c.disconnected = true // When operating without auto reconnect, send errors to any pending // requests and shutdown the client. if c.config.DisableAutoReconnect { c.requestLock.Lock() for e := c.requestList.Front(); e != nil; e = e.Next() { req := e.Value.(*jsonRequest) req.responseChan <- &futureResult{ reply: nil, err: ErrClientDisconnect, } } c.requestLock.Unlock() c.removeAllRequests() c.Shutdown() } } // Shutdown shuts down the client by disconnecting any connections associated // with the client and, when automatic reconnect is enabled, preventing future // attempts to reconnect. It also stops all goroutines. func (c *Client) Shutdown() { // Ignore the shutdown request if the client is already in the process // of shutting down or already shutdown. select { case <-c.shutdown: return default: } log.Tracef("Shutting down RPC client %s", c.config.Host) close(c.shutdown) // Send the ErrClientShutdown error to any pending requests. c.requestLock.Lock() for e := c.requestList.Front(); e != nil; e = e.Next() { req := e.Value.(*jsonRequest) req.responseChan <- &futureResult{ reply: nil, err: ErrClientShutdown, } } c.requestLock.Unlock() c.removeAllRequests() c.Disconnect() } // Start begins processing input and output messages. func (c *Client) start() { log.Tracef("Starting RPC client %s", c.config.Host) // Start the I/O processing handlers depending on whether the client is // in HTTP POST mode or the default websocket mode. if c.config.HttpPostMode { c.wg.Add(1) go c.sendPostHandler() } else { c.wg.Add(2) go c.wsInHandler() go c.wsOutHandler() } } // WaitForShutdown blocks until the client goroutines are stopped and the // connection is closed. func (c *Client) WaitForShutdown() { c.wg.Wait() } // ConnConfig describes the connection configuration parameters for the client. // This type ConnConfig struct { // Host is the IP address and port of the RPC server you want to connect // to. Host string // Endpoint is the websocket endpoint on the RPC server. This is // typically "ws" or "frontend". Endpoint string // User is the username to use to authenticate to the RPC server. User string // Pass is the passphrase to use to authenticate to the RPC server. Pass string // DisableTLS specifies whether transport layer security should be // disabled. It is recommended to always use TLS if the RPC server // supports it as otherwise your username and password is sent across // the wire in cleartext. DisableTLS bool // Certificates are the bytes for a PEM-encoded certificate chain used // for the TLS connection. It has no effect if the DisableTLS parameter // is true. Certificates []byte // Proxy specifies to connect through a SOCKS 5 proxy server. It may // be an empty string if a proxy is not required. Proxy string // ProxyUser is an optional username to use for the proxy server if it // requires authentication. It has no effect if the Proxy parameter // is not set. ProxyUser string // ProxyPass is an optional password to use for the proxy server if it // requires authentication. It has no effect if the Proxy parameter // is not set. ProxyPass string // DisableAutoReconnect specifies the client should not automatically // try to reconnect to the server when it has been disconnected. DisableAutoReconnect bool // HttpPostMode instructs the client to run using multiple independent // connections issuing HTTP POST requests instead of using the default // of websockets. Websockets are generally preferred as some of the // features of the client such notifications only work with websockets, // however, not all servers support the websocket extensions, so this // flag can be set to true to use basic HTTP POST requests instead. HttpPostMode bool } // newHttpClient returns a new http client that is configured according to the // proxy and TLS settings in the associated connection configuration. func newHttpClient(config *ConnConfig) (*http.Client, error) { // Set proxy function if there is a proxy configured. var proxyFunc func(*http.Request) (*url.URL, error) if config.Proxy != "" { proxyURL, err := url.Parse(config.Proxy) if err != nil { return nil, err } proxyFunc = http.ProxyURL(proxyURL) } // Configure TLS if needed. var tlsConfig *tls.Config if !config.DisableTLS { pool := x509.NewCertPool() pool.AppendCertsFromPEM(config.Certificates) tlsConfig = &tls.Config{ RootCAs: pool, } } client := http.Client{ Transport: &http.Transport{ Proxy: proxyFunc, TLSClientConfig: tlsConfig, }, } return &client, nil } // dial opens a websocket connection using the passed connection configuration // details. func dial(config *ConnConfig) (*websocket.Conn, error) { // Connect to websocket. url := fmt.Sprintf("wss://%s/%s", config.Host, config.Endpoint) wsConfig, err := websocket.NewConfig(url, "https://localhost/") if err != nil { return nil, err } pool := x509.NewCertPool() pool.AppendCertsFromPEM(config.Certificates) wsConfig.TlsConfig = &tls.Config{ RootCAs: pool, MinVersion: tls.VersionTLS12, } // The wallet requires basic authorization, so use a custom config with // with the Authorization header set. login := config.User + ":" + config.Pass auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) wsConfig.Header.Add("Authorization", auth) // Attempt to connect to running wallet instance using a proxy if one // is configured. if config.Proxy != "" { proxy := &socks.Proxy{ Addr: config.Proxy, Username: config.ProxyUser, Password: config.ProxyPass, } conn, err := proxy.Dial("tcp", config.Host) if err != nil { return nil, err } tlsConn := tls.Client(conn, wsConfig.TlsConfig) ws, err := websocket.NewClient(wsConfig, tlsConn) if err != nil { return nil, err } return ws, nil } // No proxy was specified, so attempt to connect to running wallet // instance directly. ws, err := websocket.DialConfig(wsConfig) if err != nil { // XXX(davec): This is not really accurate, but unfortunately // the current websocket package does not expose the status // code, so it's impossible to tell for sure. if dialError, ok := err.(*websocket.DialError); ok { if dialError.Err == websocket.ErrBadStatus { return nil, ErrInvalidAuth } } return nil, err } return ws, nil } // New create a new RPC client based on the provided connection configuration // details. The notification handlers parameter may be nil if you are not // interested in receiving notifications and will be ignored when if the // configuration is set to run in HTTP POST mode. func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error) { // Either open a websocket connection or create an HTTP client depending // on the HTTP POST mode. Also, set the notification handlers to nil // when running in HTTP POST mode. var wsConn *websocket.Conn var httpClient *http.Client if config.HttpPostMode { ntfnHandlers = nil var err error httpClient, err = newHttpClient(config) if err != nil { return nil, err } } else { var err error wsConn, err = dial(config) if err != nil { return nil, err } } client := &Client{ config: config, wsConn: wsConn, httpClient: httpClient, requestMap: make(map[int64]*list.Element), requestList: list.New(), ntfnHandlers: ntfnHandlers, sendChan: make(chan []byte, sendBufferSize), sendPostChan: make(chan *sendPostDetails, sendPostBufferSize), disconnect: make(chan struct{}), shutdown: make(chan struct{}), } client.start() if !client.config.HttpPostMode && !client.config.DisableAutoReconnect { client.wg.Add(1) go client.wsReconnectHandler() } return client, nil }