// Copyright (c) 2014-2015 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package btcrpcclient

import (


var (
	// ErrInvalidAuth is an error to describe the condition where the client
	// is either unable to authenticate or the specified endpoint is
	// incorrect.
	ErrInvalidAuth = errors.New("authentication failure")

	// ErrInvalidEndpoint is an error to describe the condition where the
	// websocket handshake failed with the specified endpoint.
	ErrInvalidEndpoint = errors.New("the endpoint either does not support " +
		"websockets or does not exist")

	// ErrClientNotConnected is an error to describe the condition where a
	// websocket client has been created, but the connection was never
	// established.  This condition differs from ErrClientDisconnect, which
	// represents an established connection that was lost.
	ErrClientNotConnected = errors.New("the client was never connected")

	// ErrClientDisconnect is an error to describe the condition where the
	// client has been disconnected from the RPC server.  When the
	// DisableAutoReconnect option is not set, any outstanding futures
	// when a client disconnect occurs will return this error as will
	// any new requests.
	ErrClientDisconnect = errors.New("the client has been disconnected")

	// ErrClientShutdown is an error to describe the condition where the
	// client is either already shutdown, or in the process of shutting
	// down.  Any outstanding futures when a client shutdown occurs will
	// return this error as will any new requests.
	ErrClientShutdown = errors.New("the client has been shutdown")

	// ErrNotWebsocketClient is an error to describe the condition of
	// calling a Client method intended for a websocket client when the
	// client has been configured to run in HTTP POST mode instead.
	ErrNotWebsocketClient = errors.New("client is not configured for " +

	// ErrClientAlreadyConnected is an error to describe the condition where
	// a new client connection cannot be established due to a websocket
	// client having already connected to the RPC server.
	ErrClientAlreadyConnected = errors.New("websocket client has already " +

const (
	// sendBufferSize is the number of elements the websocket send channel
	// can queue before blocking.
	sendBufferSize = 50

	// sendPostBufferSize is the number of elements the HTTP POST send
	// channel can queue before blocking.
	sendPostBufferSize = 100

	// connectionRetryInterval is the amount of time to wait in between
	// retries when automatically reconnecting to an RPC server.
	connectionRetryInterval = time.Second * 5

// sendPostDetails houses an HTTP POST request to send to an RPC server as well
// as the original JSON-RPC command and a channel to reply on when the server
// responds with the result.
type sendPostDetails struct {
	command      btcjson.Cmd
	request      *http.Request
	responseChan chan *response

// jsonRequest holds information about a json request that is used to properly
// detect, interpret, and deliver a reply to it.
type jsonRequest struct {
	cmd          btcjson.Cmd
	responseChan chan *response

// Client represents a Bitcoin RPC client which allows easy access to the
// various RPC methods available on a Bitcoin RPC server.  Each of the wrapper
// functions handle the details of converting the passed and return types to and
// from the underlying JSON types which are required for the JSON-RPC
// invocations
// The client provides each RPC in both synchronous (blocking) and asynchronous
// (non-blocking) forms.  The asynchronous forms are based on the concept of
// futures where they return an instance of a type that promises to deliver the
// result of the invocation at some future time.  Invoking the Receive method on
// the returned future will block until the result is available if it's not
// already.
type Client struct {
	id uint64 // atomic, so must stay 64-bit aligned

	// config holds the connection configuration assoiated with this client.
	config *ConnConfig

	// wsConn is the underlying websocket connection when not in HTTP POST
	// mode.
	wsConn *websocket.Conn

	// httpClient is the underlying HTTP client to use when running in HTTP
	// POST mode.
	httpClient *http.Client

	// mtx is a mutex to protect access to connection related fields.
	mtx sync.Mutex

	// disconnected indicated whether or not the server is disconnected.
	disconnected bool

	// retryCount holds the number of times the client has tried to
	// reconnect to the RPC server.
	retryCount int64

	// Track command and their response channels by ID.
	requestLock sync.Mutex
	requestMap  map[uint64]*list.Element
	requestList *list.List

	// Notifications.
	ntfnHandlers *NotificationHandlers
	ntfnState    *notificationState

	// Networking infrastructure.
	sendChan        chan []byte
	sendPostChan    chan *sendPostDetails
	connEstablished chan struct{}
	disconnect      chan struct{}
	shutdown        chan struct{}
	wg              sync.WaitGroup

// NextID returns the next id to be used when sending a JSON-RPC message.  This
// ID allows responses to be associated with particular requests per the
// JSON-RPC specification.  Typically the consumer of the client does not need
// to call this function, however, if a custom request is being created and used
// this function should be used to ensure the ID is unique amongst all requests
// being made.
func (c *Client) NextID() uint64 {
	return atomic.AddUint64(&c.id, 1)

// addRequest associates the passed jsonRequest with the passed id.  This allows
// the response from the remote server to be unmarshalled to the appropriate
// type and sent to the specified channel when it is received.
// If the client has already begun shutting down, ErrClientShutdown is returned
// and the request is not added.
// This function is safe for concurrent access.
func (c *Client) addRequest(id uint64, request *jsonRequest) error {
	defer c.requestLock.Unlock()

	// A non-blocking read of the shutdown channel with the request lock
	// held avoids adding the request to the client's internal data
	// structures if the client is in the process of shutting down (and
	// has not yet grabbed the request lock), or has finished shutdown
	// already (responding to each outstanding request with
	// ErrClientShutdown).
	select {
	case <-c.shutdown:
		return ErrClientShutdown

	// TODO(davec): Already there?
	element := c.requestList.PushBack(request)
	c.requestMap[id] = element
	return nil

// removeRequest returns and removes the jsonRequest which contains the response
// channel and original method associated with the passed id or nil if there is
// no association.
// This function is safe for concurrent access.
func (c *Client) removeRequest(id uint64) *jsonRequest {
	defer c.requestLock.Unlock()

	element := c.requestMap[id]
	if element != nil {
		delete(c.requestMap, id)
		request := c.requestList.Remove(element).(*jsonRequest)
		return request

	return nil

// removeAllRequests removes all the jsonRequests which contain the response
// channels for outstanding requests.
// This function MUST be called with the request lock held.
func (c *Client) removeAllRequests() {
	c.requestMap = make(map[uint64]*list.Element)

// trackRegisteredNtfns examines the passed command to see if it is one of
// the notification commands and updates the notification state that is used
// to automatically re-establish registered notifications on reconnects.
func (c *Client) trackRegisteredNtfns(cmd btcjson.Cmd) {
	// Nothing to do if the caller is not interested in notifications.
	if c.ntfnHandlers == nil {

	defer c.ntfnState.Unlock()

	switch bcmd := cmd.(type) {
	case *btcws.NotifyBlocksCmd:
		c.ntfnState.notifyBlocks = true

	case *btcws.NotifyNewTransactionsCmd:
		if bcmd.Verbose {
			c.ntfnState.notifyNewTxVerbose = true
		} else {
			c.ntfnState.notifyNewTx = true


	case *btcws.NotifySpentCmd:
		for _, op := range bcmd.OutPoints {
			c.ntfnState.notifySpent[op] = struct{}{}

	case *btcws.NotifyReceivedCmd:
		for _, addr := range bcmd.Addresses {
			c.ntfnState.notifyReceived[addr] = struct{}{}

type (
	// inMessage is the first type that an incoming message is unmarshaled
	// into. It supports both requests (for notification support) and
	// responses.  The partially-unmarshaled message is a notification if
	// the embedded ID (from the response) is nil.  Otherwise, it is a
	// response.
	inMessage struct {
		ID *uint64 `json:"id"`

	// rawNotification is a partially-unmarshaled JSON-RPC notification.
	rawNotification struct {
		Method string            `json:"method"`
		Params []json.RawMessage `json:"params"`

	// rawResponse is a partially-unmarshaled JSON-RPC response.  For this
	// to be valid (according to JSON-RPC 1.0 spec), ID may not be nil.
	rawResponse struct {
		Result json.RawMessage `json:"result"`
		Error  *btcjson.Error  `json:"error"`

// response is the raw bytes of a JSON-RPC result, or the error if the response
// error object was non-null.
type response struct {
	result []byte
	err    error

// result checks whether the unmarshaled response contains a non-nil error,
// returning an unmarshaled btcjson.Error (or an unmarshaling error) if so.
// If the response is not an error, the raw bytes of the request are
// returned for further unmashaling into specific result types.
func (r rawResponse) result() (result []byte, err error) {
	if r.Error != nil {
		return nil, r.Error
	return r.Result, nil

// handleMessage is the main handler for incoming notifications and responses.
func (c *Client) handleMessage(msg []byte) {
	// Attempt to unmarshal the message as either a notifiation or response.
	var in inMessage
	err := json.Unmarshal(msg, &in)
	if err != nil {
		log.Warnf("Remote server sent invalid message: %v", err)

	// JSON-RPC 1.0 notifications are requests with a null id.
	if in.ID == nil {
		ntfn := in.rawNotification
		if ntfn == nil {
			log.Warn("Malformed notification: missing " +
				"method and parameters")
		if ntfn.Method == "" {
			log.Warn("Malformed notification: missing method")
		// params are not optional: nil isn't valid (but len == 0 is)
		if ntfn.Params == nil {
			log.Warn("Malformed notification: missing params")
		// Deliver the notification.
		log.Tracef("Received notification [%s]", in.Method)

	if in.rawResponse == nil {
		log.Warn("Malformed response: missing result and error")

	id := *in.ID
	log.Tracef("Received response for id %d (result %s)", id, in.Result)
	request := c.removeRequest(id)

	// Nothing more to do if there is no request associated with this reply.
	if request == nil || request.responseChan == nil {
		log.Warnf("Received unexpected reply: %s (id %d)", in.Result,

	// Since the command was successful, examine it to see if it's a
	// notification, and if is, add it to the notification state so it
	// can automatically be re-established on reconnect.

	// Deliver the response.
	result, err := in.rawResponse.result()
	request.responseChan <- &response{result: result, err: err}

// wsInHandler handles all incoming messages for the websocket connection
// associated with the client.  It must be run as a goroutine.
func (c *Client) wsInHandler() {
	for {
		// Break out of the loop once the shutdown channel has been
		// closed.  Use a non-blocking select here so we fall through
		// otherwise.
		select {
		case <-c.shutdown:
			break out

		_, msg, err := c.wsConn.ReadMessage()
		if err != nil {
			// Log the error if it's not due to disconnecting.
			if _, ok := err.(*net.OpError); !ok {
				log.Errorf("Websocket receive error from "+
					"%s: %v", c.config.Host, err)
			break out

	// Ensure the connection is closed.
	log.Tracef("RPC client input handler done for %s", c.config.Host)

// wsOutHandler handles all outgoing messages for the websocket connection.  It
// uses a buffered channel to serialize output messages while allowing the
// sender to continue running asynchronously.  It must be run as a goroutine.
func (c *Client) wsOutHandler() {
	for {
		// Send any messages ready for send until the client is
		// disconnected closed.
		select {
		case msg := <-c.sendChan:
			err := c.wsConn.WriteMessage(websocket.TextMessage, msg)
			if err != nil {
				break out

		case <-c.disconnect:
			break out

	// Drain any channels before exiting so nothing is left waiting around
	// to send.
	for {
		select {
		case <-c.sendChan:
			break cleanup
	log.Tracef("RPC client output handler done for %s", c.config.Host)

// sendMessage sends the passed JSON to the connected server using the
// websocket connection.  It is backed by a buffered channel, so it will not
// block until the send channel is full.
func (c *Client) sendMessage(marshalledJSON []byte) {
	// Don't send the message if disconnected.
	select {
	case c.sendChan <- marshalledJSON:
	case <-c.disconnect:

// reregisterNtfns creates and sends commands needed to re-establish the current
// notification state associated with the client.  It should only be called on
// on reconnect by the resendCmds function.
func (c *Client) reregisterNtfns() error {
	// Nothing to do if the caller is not interested in notifications.
	if c.ntfnHandlers == nil {
		return nil

	// In order to avoid holding the lock on the notification state for the
	// entire time of the potentially long running RPCs issued below, make a
	// copy of it and work from that.
	// Also, other commands will be running concurrently which could modify
	// the notification state (while not under the lock of course) which
	// also register it with the remote RPC server, so this prevents double
	// registrations.
	stateCopy := c.ntfnState.Copy()

	// Reregister notifyblocks if needed.
	if stateCopy.notifyBlocks {
		log.Debugf("Reregistering [notifyblocks]")
		if err := c.NotifyBlocks(); err != nil {
			return err

	// Reregister notifynewtransactions if needed.
	if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
		log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)",
		err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose)
		if err != nil {
			return err

	// Reregister the combination of all previously registered notifyspent
	// outpoints in one command if needed.
	nslen := len(stateCopy.notifySpent)
	if nslen > 0 {
		outpoints := make([]btcws.OutPoint, 0, nslen)
		for op := range stateCopy.notifySpent {
			outpoints = append(outpoints, op)
		log.Debugf("Reregistering [notifyspent] outpoints: %v", outpoints)
		if err := c.notifySpentInternal(outpoints).Receive(); err != nil {
			return err

	// Reregister the combination of all previously registered
	// notifyreceived addresses in one command if needed.
	nrlen := len(stateCopy.notifyReceived)
	if nrlen > 0 {
		addresses := make([]string, 0, nrlen)
		for addr := range stateCopy.notifyReceived {
			addresses = append(addresses, addr)
		log.Debugf("Reregistering [notifyreceived] addresses: %v", addresses)
		if err := c.notifyReceivedInternal(addresses).Receive(); err != nil {
			return err

	return nil

// ignoreResends is a set of all methods for requests that are "long running"
// are not be reissued by the client on reconnect.
var ignoreResends = map[string]struct{}{
	"rescan": struct{}{},

// resendCmds resends any commands that had not completed when the client
// disconnected.  It is intended to be called once the client has reconnected as
// a separate goroutine.
func (c *Client) resendCmds() {
	// Set the notification state back up.  If anything goes wrong,
	// disconnect the client.
	if err := c.reregisterNtfns(); err != nil {
		log.Warnf("Unable to re-establish notification state: %v", err)

	// Since it's possible to block on send and more commands might be
	// added by the caller while resending, make a copy of all of the
	// commands that need to be resent now and work from the copy.  This
	// also allows the lock to be released quickly.
	resendCmds := make([]*jsonRequest, 0, c.requestList.Len())
	var nextElem *list.Element
	for e := c.requestList.Front(); e != nil; e = nextElem {
		nextElem = e.Next()

		req := e.Value.(*jsonRequest)
		if _, ok := ignoreResends[req.cmd.Method()]; ok {
			// If a request is not sent on reconnect, remove it
			// from the request structures, since no reply is
			// expected.
			delete(c.requestMap, req.cmd.Id().(uint64))
		} else {
			resendCmds = append(resendCmds, req)

	for _, req := range resendCmds {
		// Stop resending commands if the client disconnected again
		// since the next reconnect will handle them.
		if c.Disconnected() {

		c.marshalAndSend(req.cmd, req.responseChan)

// wsReconnectHandler listens for client disconnects and automatically tries
// to reconnect with retry interval that scales based on the number of retries.
// It also resends any commands that had not completed when the client
// disconnected so the disconnect/reconnect process is largely transparent to
// the caller.  This function is not run when the DisableAutoReconnect config
// options is set.
// This function must be run as a goroutine.
func (c *Client) wsReconnectHandler() {
	for {
		select {
		case <-c.disconnect:
			// On disconnect, fallthrough to reestablish the
			// connection.

		case <-c.shutdown:
			break out

		for {
			select {
			case <-c.shutdown:
				break out

			wsConn, err := dial(c.config)
			if err != nil {
				log.Infof("Failed to connect to %s: %v",
					c.config.Host, err)

				// Scale the retry interval by the number of
				// retries so there is a backoff up to a max
				// of 1 minute.
				scaledInterval := connectionRetryInterval.Nanoseconds() * c.retryCount
				scaledDuration := time.Duration(scaledInterval)
				if scaledDuration > time.Minute {
					scaledDuration = time.Minute
				log.Infof("Retrying connection to %s in "+
					"%s", c.config.Host, scaledDuration)
				continue reconnect

			log.Infof("Reestablished connection to RPC server %s",

			// Reset the connection state and signal the reconnect
			// has happened.
			c.wsConn = wsConn
			c.retryCount = 0
			c.disconnect = make(chan struct{})

			c.disconnected = false

			// Start processing input and output for the
			// new connection.

			// Reissue pending commands in another goroutine since
			// the send can block.
			go c.resendCmds()

			// Break out of the reconnect loop back to wait for
			// disconnect again.
			break reconnect
	log.Tracef("RPC client reconnect handler done for %s", c.config.Host)

// handleSendPostMessage handles performing the passed HTTP request, reading the
// result, unmarshalling it, and delivering the unmarhsalled result to the
// provided response channel.
func (c *Client) handleSendPostMessage(details *sendPostDetails) {
	// Post the request.
	cmd := details.command
	log.Tracef("Sending command [%s] with id %d", cmd.Method(), cmd.Id())
	httpResponse, err := c.httpClient.Do(details.request)
	if err != nil {
		details.responseChan <- &response{err: err}

	// Read the raw bytes and close the response.
	respBytes, err := btcjson.GetRaw(httpResponse.Body)
	if err != nil {
		details.responseChan <- &response{err: err}

	// Handle unsuccessful HTTP responses
	if httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 {
		details.responseChan <- &response{err: errors.New(string(respBytes))}

	var resp rawResponse
	err = json.Unmarshal(respBytes, &resp)
	if err != nil {
		details.responseChan <- &response{err: err}

	res, err := resp.result()
	details.responseChan <- &response{result: res, err: err}

// sendPostHandler handles all outgoing messages when the client is running
// in HTTP POST mode.  It uses a buffered channel to serialize output messages
// while allowing the sender to continue running asynchronously.  It must be run
// as a goroutine.
func (c *Client) sendPostHandler() {
	for {
		// Send any messages ready for send until the shutdown channel
		// is closed.
		select {
		case details := <-c.sendPostChan:

		case <-c.shutdown:
			break out

	// Drain any wait channels before exiting so nothing is left waiting
	// around to send.
	for {
		select {
		case details := <-c.sendPostChan:
			details.responseChan <- &response{
				result: nil,
				err:    ErrClientShutdown,

			break cleanup
	log.Tracef("RPC client send handler done for %s", c.config.Host)


// sendPostRequest sends the passed HTTP request to the RPC server using the
// HTTP client associated with the client.  It is backed by a buffered channel,
// so it will not block until the send channel is full.
func (c *Client) sendPostRequest(req *http.Request, command btcjson.Cmd, responseChan chan *response) {
	// Don't send the message if shutting down.
	select {
	case <-c.shutdown:
		responseChan <- &response{result: nil, err: ErrClientShutdown}

	c.sendPostChan <- &sendPostDetails{
		command:      command,
		request:      req,
		responseChan: responseChan,

// newFutureError returns a new future result channel that already has the
// passed error waitin on the channel with the reply set to nil.  This is useful
// to easily return errors from the various Async functions.
func newFutureError(err error) chan *response {
	responseChan := make(chan *response, 1)
	responseChan <- &response{err: err}
	return responseChan

// receiveFuture receives from the passed futureResult channel to extract a
// reply or any errors.  The examined errors include an error in the
// futureResult and the error in the reply from the server.  This will block
// until the result is available on the passed channel.
func receiveFuture(f chan *response) ([]byte, error) {
	// Wait for a response on the returned channel.
	r := <-f
	return r.result, r.err

// marshalAndSendPost marshals the passed command to JSON-RPC and sends it to
// the server by issuing an HTTP POST request and returns a response channel
// on which the reply will be delivered.  Typically a new connection is opened
// and closed for each command when using this method, however, the underlying
// HTTP client might coalesce multiple commands depending on several factors
// including the remote server configuration.
func (c *Client) marshalAndSendPost(cmd btcjson.Cmd, responseChan chan *response) {
	marshalledJSON, err := json.Marshal(cmd)
	if err != nil {
		responseChan <- &response{result: nil, err: err}

	// Generate a request to the configured RPC server.
	protocol := "http"
	if !c.config.DisableTLS {
		protocol = "https"
	url := protocol + "://" + c.config.Host
	req, err := http.NewRequest("POST", url, bytes.NewReader(marshalledJSON))
	if err != nil {
		responseChan <- &response{result: nil, err: err}
	req.Close = true
	req.Header.Set("Content-Type", "application/json")

	// Configure basic access authorization.
	req.SetBasicAuth(c.config.User, c.config.Pass)

	log.Tracef("Sending command [%s] with id %d", cmd.Method(), cmd.Id())
	c.sendPostRequest(req, cmd, responseChan)

// marshalAndSend marshals the passed command to JSON-RPC and sends it to the
// server.  It returns a response channel on which the reply will be delivered.
func (c *Client) marshalAndSend(cmd btcjson.Cmd, responseChan chan *response) {
	marshalledJSON, err := cmd.MarshalJSON()
	if err != nil {
		responseChan <- &response{result: nil, err: err}

	log.Tracef("Sending command [%s] with id %d", cmd.Method(), cmd.Id())

// sendCmd sends the passed command to the associated server and returns a
// response channel on which the reply will be deliver at some point in the
// future.  It handles both websocket and HTTP POST mode depending on the
// configuration of the client.
func (c *Client) sendCmd(cmd btcjson.Cmd) chan *response {
	// Choose which marshal and send function to use depending on whether
	// the client running in HTTP POST mode or not.  When running in HTTP
	// POST mode, the command is issued via an HTTP client.  Otherwise,
	// the command is issued via the asynchronous websocket channels.
	responseChan := make(chan *response, 1)
	if c.config.HttpPostMode {
		c.marshalAndSendPost(cmd, responseChan)
		return responseChan

	// Check whether the websocket connection has never been established,
	// in which case the handler goroutines are not running.
	select {
	case <-c.connEstablished:
		responseChan <- &response{err: ErrClientNotConnected}
		return responseChan

	err := c.addRequest(cmd.Id().(uint64), &jsonRequest{
		cmd:          cmd,
		responseChan: responseChan,
	if err != nil {
		responseChan <- &response{err: err}
		return responseChan
	c.marshalAndSend(cmd, responseChan)
	return responseChan

// sendCmdAndWait sends the passed command to the associated server, waits
// for the reply, and returns the result from it.  It will return the error
// field in the reply if there is one.
func (c *Client) sendCmdAndWait(cmd btcjson.Cmd) (interface{}, error) {
	// Marshal the command to JSON-RPC, send it to the connected server, and
	// wait for a response on the returned channel.
	return receiveFuture(c.sendCmd(cmd))

// Disconnected returns whether or not the server is disconnected.  If a
// websocket client was created but never connected, this also returns false.
func (c *Client) Disconnected() bool {
	defer c.mtx.Unlock()

	select {
	case <-c.connEstablished:
		return c.disconnected
		return false

// doDisconnect disconnects the websocket associated with the client if it
// hasn't already been disconnected.  It will return false if the disconnect is
// not needed or the client is running in HTTP POST mode.
// This function is safe for concurrent access.
func (c *Client) doDisconnect() bool {
	if c.config.HttpPostMode {
		return false

	defer c.mtx.Unlock()

	// Nothing to do if already disconnected.
	if c.disconnected {
		return false

	log.Tracef("Disconnecting RPC client %s", c.config.Host)
	if c.wsConn != nil {
	c.disconnected = true
	return true

// doShutdown closes the shutdown channel and logs the shutdown unless shutdown
// is already in progress.  It will return false if the shutdown is not needed.
// This function is safe for concurrent access.
func (c *Client) doShutdown() bool {
	// Ignore the shutdown request if the client is already in the process
	// of shutting down or already shutdown.
	select {
	case <-c.shutdown:
		return false

	log.Tracef("Shutting down RPC client %s", c.config.Host)
	return true

// Disconnect disconnects the current websocket associated with the client.  The
// connection will automatically be re-established unless the client was
// created with the DisableAutoReconnect flag.
// This function has no effect when the client is running in HTTP POST mode.
func (c *Client) Disconnect() {
	// Nothing to do if already disconnected or running in HTTP POST mode.
	if !c.doDisconnect() {

	defer c.requestLock.Unlock()

	// When operating without auto reconnect, send errors to any pending
	// requests and shutdown the client.
	if c.config.DisableAutoReconnect {
		for e := c.requestList.Front(); e != nil; e = e.Next() {
			req := e.Value.(*jsonRequest)
			req.responseChan <- &response{
				result: nil,
				err:    ErrClientDisconnect,

// Shutdown shuts down the client by disconnecting any connections associated
// with the client and, when automatic reconnect is enabled, preventing future
// attempts to reconnect.  It also stops all goroutines.
func (c *Client) Shutdown() {
	// Do the shutdown under the request lock to prevent clients from
	// adding new requests while the client shutdown process is initiated.
	defer c.requestLock.Unlock()

	// Ignore the shutdown request if the client is already in the process
	// of shutting down or already shutdown.
	if !c.doShutdown() {

	// Send the ErrClientShutdown error to any pending requests.
	for e := c.requestList.Front(); e != nil; e = e.Next() {
		req := e.Value.(*jsonRequest)
		req.responseChan <- &response{
			result: nil,
			err:    ErrClientShutdown,

	// Disconnect the client if needed.

// start begins processing input and output messages.
func (c *Client) start() {
	log.Tracef("Starting RPC client %s", c.config.Host)

	// Start the I/O processing handlers depending on whether the client is
	// in HTTP POST mode or the default websocket mode.
	if c.config.HttpPostMode {
		go c.sendPostHandler()
	} else {
		go func() {
			if c.ntfnHandlers != nil {
				if c.ntfnHandlers.OnClientConnected != nil {
		go c.wsInHandler()
		go c.wsOutHandler()

// WaitForShutdown blocks until the client goroutines are stopped and the
// connection is closed.
func (c *Client) WaitForShutdown() {

// ConnConfig describes the connection configuration parameters for the client.
// This
type ConnConfig struct {
	// Host is the IP address and port of the RPC server you want to connect
	// to.
	Host string

	// Endpoint is the websocket endpoint on the RPC server.  This is
	// typically "ws".
	Endpoint string

	// User is the username to use to authenticate to the RPC server.
	User string

	// Pass is the passphrase to use to authenticate to the RPC server.
	Pass string

	// DisableTLS specifies whether transport layer security should be
	// disabled.  It is recommended to always use TLS if the RPC server
	// supports it as otherwise your username and password is sent across
	// the wire in cleartext.
	DisableTLS bool

	// Certificates are the bytes for a PEM-encoded certificate chain used
	// for the TLS connection.  It has no effect if the DisableTLS parameter
	// is true.
	Certificates []byte

	// Proxy specifies to connect through a SOCKS 5 proxy server.  It may
	// be an empty string if a proxy is not required.
	Proxy string

	// ProxyUser is an optional username to use for the proxy server if it
	// requires authentication.  It has no effect if the Proxy parameter
	// is not set.
	ProxyUser string

	// ProxyPass is an optional password to use for the proxy server if it
	// requires authentication.  It has no effect if the Proxy parameter
	// is not set.
	ProxyPass string

	// DisableAutoReconnect specifies the client should not automatically
	// try to reconnect to the server when it has been disconnected.
	DisableAutoReconnect bool

	// DisableConnectOnNew specifies that a websocket client connection
	// should not be tried when creating the client with New.  Instead, the
	// client is created and returned unconnected, and Connect must be
	// called manually.
	DisableConnectOnNew bool

	// HttpPostMode instructs the client to run using multiple independent
	// connections issuing HTTP POST requests instead of using the default
	// of websockets.  Websockets are generally preferred as some of the
	// features of the client such notifications only work with websockets,
	// however, not all servers support the websocket extensions, so this
	// flag can be set to true to use basic HTTP POST requests instead.
	HttpPostMode bool

	// EnableBCInfoHacks is an option provided to enable compatiblity hacks
	// when connecting to blockchain.info RPC server
	EnableBCInfoHacks bool

// newHTTPClient returns a new http client that is configured according to the
// proxy and TLS settings in the associated connection configuration.
func newHTTPClient(config *ConnConfig) (*http.Client, error) {
	// Set proxy function if there is a proxy configured.
	var proxyFunc func(*http.Request) (*url.URL, error)
	if config.Proxy != "" {
		proxyURL, err := url.Parse(config.Proxy)
		if err != nil {
			return nil, err
		proxyFunc = http.ProxyURL(proxyURL)

	// Configure TLS if needed.
	var tlsConfig *tls.Config
	if !config.DisableTLS {
		if len(config.Certificates) > 0 {
			pool := x509.NewCertPool()
			tlsConfig = &tls.Config{
				RootCAs: pool,

	client := http.Client{
		Transport: &http.Transport{
			Proxy:           proxyFunc,
			TLSClientConfig: tlsConfig,

	return &client, nil

// dial opens a websocket connection using the passed connection configuration
// details.
func dial(config *ConnConfig) (*websocket.Conn, error) {
	// Setup TLS if not disabled.
	var tlsConfig *tls.Config
	var scheme = "ws"
	if !config.DisableTLS {
		tlsConfig = &tls.Config{
			MinVersion: tls.VersionTLS12,
		if len(config.Certificates) > 0 {
			pool := x509.NewCertPool()
			tlsConfig.RootCAs = pool
		scheme = "wss"

	// Create a websocket dialer that will be used to make the connection.
	// It is modified by the proxy setting below as needed.
	dialer := websocket.Dialer{TLSClientConfig: tlsConfig}

	// Setup the proxy if one is configured.
	if config.Proxy != "" {
		proxy := &socks.Proxy{
			Addr:     config.Proxy,
			Username: config.ProxyUser,
			Password: config.ProxyPass,
		dialer.NetDial = proxy.Dial

	// The RPC server requires basic authorization, so create a custom
	// request header with the Authorization header set.
	login := config.User + ":" + config.Pass
	auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
	requestHeader := make(http.Header)
	requestHeader.Add("Authorization", auth)

	// Dial the connection.
	url := fmt.Sprintf("%s://%s/%s", scheme, config.Host, config.Endpoint)
	wsConn, resp, err := dialer.Dial(url, requestHeader)
	if err != nil {
		if err != websocket.ErrBadHandshake || resp == nil {
			return nil, err

		// Detect HTTP authentication error status codes.
		if resp.StatusCode == http.StatusUnauthorized ||
			resp.StatusCode == http.StatusForbidden {
			return nil, ErrInvalidAuth

		// The connection was authenticated and the status response was
		// ok, but the websocket handshake still failed, so the endpoint
		// is invalid in some way.
		if resp.StatusCode == http.StatusOK {
			return nil, ErrInvalidEndpoint

		// Return the status text from the server if none of the special
		// cases above apply.
		return nil, errors.New(resp.Status)
	return wsConn, nil

// New creates a new RPC client based on the provided connection configuration
// details.  The notification handlers parameter may be nil if you are not
// interested in receiving notifications and will be ignored when if the
// configuration is set to run in HTTP POST mode.
func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error) {
	// Either open a websocket connection or create an HTTP client depending
	// on the HTTP POST mode.  Also, set the notification handlers to nil
	// when running in HTTP POST mode.
	var wsConn *websocket.Conn
	var httpClient *http.Client
	connEstablished := make(chan struct{})
	var start bool
	if config.HttpPostMode {
		ntfnHandlers = nil
		start = true

		var err error
		httpClient, err = newHTTPClient(config)
		if err != nil {
			return nil, err
	} else {
		if !config.DisableConnectOnNew {
			var err error
			wsConn, err = dial(config)
			if err != nil {
				return nil, err
			start = true
	log.Infof("Established connection to RPC server %s",

	client := &Client{
		config:          config,
		wsConn:          wsConn,
		httpClient:      httpClient,
		requestMap:      make(map[uint64]*list.Element),
		requestList:     list.New(),
		ntfnHandlers:    ntfnHandlers,
		ntfnState:       newNotificationState(),
		sendChan:        make(chan []byte, sendBufferSize),
		sendPostChan:    make(chan *sendPostDetails, sendPostBufferSize),
		connEstablished: connEstablished,
		disconnect:      make(chan struct{}),
		shutdown:        make(chan struct{}),

	if start {
		if !client.config.HttpPostMode && !client.config.DisableAutoReconnect {
			go client.wsReconnectHandler()

	return client, nil

// Connect establishes the initial websocket connection.  This is necessary when
// a client was created after setting the DisableConnectOnNew field of the
// Config struct.
// Up to tries number of connections (each after an increasing backoff) will
// be tried if the connection can not be established.  The special value of 0
// indicates an unlimited number of connection attempts.
// This method will error if the client is not configured for websockets, if the
// connection has already been established, or if none of the connection
// attempts were successful.
func (c *Client) Connect(tries int) error {
	defer c.mtx.Unlock()

	if c.config.HttpPostMode {
		return ErrNotWebsocketClient
	if c.wsConn != nil {
		return ErrClientAlreadyConnected

	// Begin connection attempts.  Increase the backoff after each failed
	// attempt, up to a maximum of one minute.
	var err error
	var backoff time.Duration
	for i := 0; tries == 0 || i < tries; i++ {
		var wsConn *websocket.Conn
		wsConn, err = dial(c.config)
		if err != nil {
			backoff = connectionRetryInterval * time.Duration(i+1)
			if backoff > time.Minute {
				backoff = time.Minute

		// Connection was established.  Set the websocket connection
		// member of the client and start the goroutines necessary
		// to run the client.
		c.wsConn = wsConn
		if !c.config.DisableAutoReconnect {
			go c.wsReconnectHandler()
		return nil

	// All connection attempts failed, so return the last error.
	return err