Auto recreate notification state on reconnect.

This commit adds logic to track all registered notifications that have
been registered by the client in a notification state when the default
automatic reconnect is enabled.

The notification state is then used to reregister for all previously
registered notifications on reconnect.  This allows the caller to
continue receiving notifications across reconnect cycles.
This commit is contained in:
Dave Collins 2014-05-09 23:44:43 -05:00
parent 7cc356d4c7
commit a35c1e8ede
2 changed files with 209 additions and 3 deletions

View file

@ -14,6 +14,7 @@ import (
"errors"
"fmt"
"github.com/conformal/btcjson"
"github.com/conformal/btcws"
"github.com/conformal/go-socks"
"github.com/gorilla/websocket"
"net"
@ -127,8 +128,9 @@ type Client struct {
requestMap map[uint64]*list.Element
requestList *list.List
// Notification handlers.
// Notifications.
ntfnHandlers *NotificationHandlers
ntfnState *notificationState
// Networking infrastructure.
sendChan chan []byte
@ -193,6 +195,42 @@ func (c *Client) removeAllRequests() {
c.requestList.Init()
}
// trackRegisteredNtfns examines the passed command to see if it is one of
// the notification commands and updates the notification state that is used
// to automatically re-establish registered notifications on reconnects.
func (c *Client) trackRegisteredNtfns(cmd btcjson.Cmd) {
// Nothing to do if the caller is not interested in notifications.
if c.ntfnHandlers == nil {
return
}
c.ntfnState.Lock()
defer c.ntfnState.Unlock()
switch bcmd := cmd.(type) {
case *btcws.NotifyBlocksCmd:
c.ntfnState.notifyBlocks = true
case *btcws.NotifyNewTransactionsCmd:
if bcmd.Verbose {
c.ntfnState.notifyNewTxVerbose = true
} else {
c.ntfnState.notifyNewTx = true
}
case *btcws.NotifySpentCmd:
for _, op := range bcmd.OutPoints {
c.ntfnState.notifySpent[op] = struct{}{}
}
case *btcws.NotifyReceivedCmd:
for _, addr := range bcmd.Addresses {
c.ntfnState.notifyReceived[addr] = struct{}{}
}
}
}
// handleMessage is the main handler for incoming requests. It enforces
// authentication, parses the incoming json, looks up and executes handlers
// (including pass through for standard RPC commands), sends the appropriate
@ -259,6 +297,13 @@ func (c *Client) handleMessage(msg []byte) {
request.responseChan <- &futureResult{reply: nil, err: err}
return
}
// Since the command was successful, examine it to see if it's a
// notification, and if is, add it to the notification state so it
// can automatically be re-established on reconnect.
c.trackRegisteredNtfns(request.cmd)
// Deliver the reply.
request.responseChan <- &futureResult{reply: &reply, err: nil}
}
@ -341,10 +386,86 @@ func (c *Client) sendMessage(marshalledJSON []byte) {
c.sendChan <- marshalledJSON
}
// reregisterNtfns creates and sends commands needed to re-establish the current
// notification state associated with the client. It should only be called on
// on reconnect by the resendCmds function.
func (c *Client) reregisterNtfns() error {
// Nothing to do if the caller is not interested in notifications.
if c.ntfnHandlers == nil {
return nil
}
// In order to avoid holding the lock on the notification state for the
// entire time of the potentially long running RPCs issued below, make a
// copy of it and work from that.
//
// Also, other commands will be running concurrently which could modify
// the notification state (while not under the lock of course) which
// also register it with the remote RPC server, so this prevents double
// registrations.
stateCopy := c.ntfnState.Copy()
// Reregister notifyblocks if needed.
if stateCopy.notifyBlocks {
log.Debugf("Reregistering [notifyblocks]")
if err := c.NotifyBlocks(); err != nil {
return err
}
}
// Reregister notifynewtransactions if needed.
if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)",
stateCopy.notifyNewTxVerbose)
err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose)
if err != nil {
return err
}
}
// Reregister the combination of all previously registered notifyspent
// outpoints in one command if needed.
nslen := len(stateCopy.notifySpent)
if nslen > 0 {
outpoints := make([]btcws.OutPoint, 0, nslen)
for op := range stateCopy.notifySpent {
outpoints = append(outpoints, op)
}
log.Debugf("Reregistering [notifyspent] outpoints: %v", outpoints)
if err := c.notifySpentInternal(outpoints).Receive(); err != nil {
return err
}
}
// Reregister the combination of all previously registered
// notifyreceived addresses in one command if needed.
nrlen := len(stateCopy.notifyReceived)
if nrlen > 0 {
addresses := make([]string, 0, nrlen)
for addr := range stateCopy.notifyReceived {
addresses = append(addresses, addr)
}
log.Debugf("Reregistering [notifyreceived] addresses: %v", addresses)
if err := c.notifyReceivedInternal(addresses).Receive(); err != nil {
return err
}
}
return nil
}
// resendCmds resends any commands that had not completed when the client
// disconnected. It is intended to be called once the client has reconnected
// as a separate goroutine.
// disconnected. It is intended to be called once the client has reconnected as
// a separate goroutine.
func (c *Client) resendCmds() {
// Set the notification state back up. If anything goes wrong,
// disconnect the client.
if err := c.reregisterNtfns(); err != nil {
log.Warnf("Unable to re-establish notification state: %v", err)
c.Disconnect()
return
}
// Since it's possible to block on send and more commands might be
// added by the caller while resending, make a copy of all of the
// commands that need to be resent now and work from the copy. This
@ -928,6 +1049,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
requestMap: make(map[uint64]*list.Element),
requestList: list.New(),
ntfnHandlers: ntfnHandlers,
ntfnState: newNotificationState(),
sendChan: make(chan []byte, sendBufferSize),
sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
disconnect: make(chan struct{}),

View file

@ -12,6 +12,7 @@ import (
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"github.com/conformal/btcws"
"sync"
)
var (
@ -23,6 +24,46 @@ var (
"supported when running in HTTP POST mode")
)
// notificationState is used to track the current state of successfuly
// registered notification so the state can be automatically re-established on
// reconnect.
type notificationState struct {
sync.Mutex
notifyBlocks bool
notifyNewTx bool
notifyNewTxVerbose bool
notifyReceived map[string]struct{}
notifySpent map[btcws.OutPoint]struct{}
}
// Copy returns a deep copy of the receiver.
//
// This function is safe for concurrent access.
func (s *notificationState) Copy() *notificationState {
s.Lock()
defer s.Unlock()
stateCopy := *s
stateCopy.notifyReceived = make(map[string]struct{})
for addr := range s.notifyReceived {
stateCopy.notifyReceived[addr] = struct{}{}
}
stateCopy.notifySpent = make(map[btcws.OutPoint]struct{})
for op := range s.notifySpent {
stateCopy.notifySpent[op] = struct{}{}
}
return &stateCopy
}
// newNotificationState returns a new notification state ready to be populated.
func newNotificationState() *notificationState {
return &notificationState{
notifyReceived: make(map[string]struct{}),
notifySpent: make(map[btcws.OutPoint]struct{}),
}
}
// newNilFutureResult returns a new future result channel that already has the
// result waiting on the channel with the reply set to nil. This is useful
// to ignore things such as notifications when the caller didn't specify any
@ -371,6 +412,27 @@ func (r FutureNotifySpentResult) Receive() error {
return nil
}
// notifySpentInternal is the same as notifySpentAsync except it accepts
// the converted outpoints as a parameter so the client can more efficiently
// recreate the previous notification state on reconnect.
func (c *Client) notifySpentInternal(outpoints []btcws.OutPoint) FutureNotifySpentResult {
// Not supported in HTTP POST mode.
if c.config.HttpPostMode {
return newFutureError(ErrNotificationsNotSupported)
}
// Ignore the notification if the client is not interested in
// notifications.
if c.ntfnHandlers == nil {
return newNilFutureResult()
}
id := c.NextID()
cmd := btcws.NewNotifySpentCmd(id, outpoints)
return c.sendCmd(cmd)
}
// NotifySpentAsync returns an instance of a type that can be used to get the
// result of the RPC at some future time by invoking the Receive function on
// the returned instance.
@ -487,6 +549,28 @@ func (r FutureNotifyReceivedResult) Receive() error {
return nil
}
// notifyReceivedInternal is the same as notifyReceivedAsync except it accepts
// the converted addresses as a parameter so the client can more efficiently
// recreate the previous notification state on reconnect.
func (c *Client) notifyReceivedInternal(addresses []string) FutureNotifyReceivedResult {
// Not supported in HTTP POST mode.
if c.config.HttpPostMode {
return newFutureError(ErrNotificationsNotSupported)
}
// Ignore the notification if the client is not interested in
// notifications.
if c.ntfnHandlers == nil {
return newNilFutureResult()
}
// Convert addresses to strings.
id := c.NextID()
cmd := btcws.NewNotifyReceivedCmd(id, addresses)
return c.sendCmd(cmd)
}
// NotifyReceivedAsync returns an instance of a type that can be used to get the
// result of the RPC at some future time by invoking the Receive function on
// the returned instance.