Send btcdconnected notifications without a wallet.

The notified chain server connection state was being passed through
the wallet and then notified to the RPC server, which prevented this
notification from ever firing if a wallet didn't exist yet.  Instead,
make the RPC server register for these notifications directly from the
chain server RPC client.

I'm not happy with this notification and how it's handled in the code,
but to not break existing clients this change is being made.  Fixing
the notifiation mess and modifying existing clients to use a new
notification API will need to be done sometime later.
This commit is contained in:
Josh Rickmar 2014-07-30 09:47:50 -05:00
parent 400153d7c0
commit 248ea9c08f
4 changed files with 99 additions and 68 deletions

View file

@ -38,6 +38,13 @@ type Client struct {
dequeueNotification chan interface{}
currentBlock chan *keystore.BlockStamp
// Notification channels regarding the state of the client. These exist
// so other components can listen in on chain activity. These are
// initialized as nil, and must be created by calling one of the Listen*
// methods.
connected chan bool
notificationLock sync.Locker
quit chan struct{}
wg sync.WaitGroup
started bool
@ -50,6 +57,7 @@ func NewClient(net *btcnet.Params, connect, user, pass string, certs []byte) (*C
enqueueNotification: make(chan interface{}),
dequeueNotification: make(chan interface{}),
currentBlock: make(chan *keystore.BlockStamp),
notificationLock: new(sync.Mutex),
quit: make(chan struct{}),
}
initializedClient := make(chan struct{})
@ -144,7 +152,6 @@ func (c *Client) BlockStamp() (*keystore.BlockStamp, error) {
// btcrpcclient callbacks, which isn't very Go-like and doesn't allow
// blocking client calls.
type (
ClientConnected struct{}
BlockConnected keystore.BlockStamp
BlockDisconnected keystore.BlockStamp
RecvTx struct {
@ -188,7 +195,7 @@ func parseBlock(block *btcws.BlockDetails) (blk *txstore.Block, idx int, err err
func (c *Client) onClientConnect() {
log.Info("Established websocket RPC connection to btcd")
c.enqueueNotification <- ClientConnected{}
c.notifyConnected(true)
}
func (c *Client) onBlockConnected(hash *btcwire.ShaHash, height int32) {
@ -308,3 +315,49 @@ out:
close(c.dequeueNotification)
c.wg.Done()
}
// ErrDuplicateListen is returned for any attempts to listen for the same
// notification more than once. If callers must pass along a notifiation to
// multiple places, they must broadcast it themself.
var ErrDuplicateListen = errors.New("duplicate listen")
type noopLocker struct{}
func (noopLocker) Lock() {}
func (noopLocker) Unlock() {}
// ListenConnected returns a channel that passes the current connection state
// of the client. This will be automatically sent to when the client is first
// connected, as well as the current state whenever NotifyConnected is
// forcibly called.
//
// If this is called twice, ErrDuplicateListen is returned.
func (c *Client) ListenConnected() (<-chan bool, error) {
c.notificationLock.Lock()
defer c.notificationLock.Unlock()
if c.connected != nil {
return nil, ErrDuplicateListen
}
c.connected = make(chan bool)
c.notificationLock = noopLocker{}
return c.connected, nil
}
func (c *Client) notifyConnected(connected bool) {
c.notificationLock.Lock()
if c.connected != nil {
c.connected <- connected
}
c.notificationLock.Unlock()
}
// NotifyConnected sends the channel notification for a connected or
// disconnected client. This is exported so it can be called by other
// packages which require notifying the current connection state.
//
// TODO: This shouldn't exist, but the current notification API requires it.
func (c *Client) NotifyConnected() {
connected := !c.Client.Disconnected()
c.notifyConnected(connected)
}

View file

@ -28,8 +28,6 @@ func (w *Wallet) handleChainNotifications() {
for n := range w.chainSvr.Notifications() {
var err error
switch n := n.(type) {
case chain.ClientConnected:
w.notifyChainServerConnected(true)
case chain.BlockConnected:
w.connectBlock(keystore.BlockStamp(n))
case chain.BlockDisconnected:

View file

@ -265,17 +265,18 @@ type rpcServer struct {
// Channels read from other components from which notifications are
// created.
connectedBlocks <-chan keystore.BlockStamp
disconnectedBlocks <-chan keystore.BlockStamp
newCredits <-chan txstore.Credit
newDebits <-chan txstore.Debits
minedCredits <-chan txstore.Credit
minedDebits <-chan txstore.Debits
keystoreLocked <-chan bool
confirmedBalance <-chan btcutil.Amount
unconfirmedBalance <-chan btcutil.Amount
chainServerConnected <-chan bool
registerWalletNtfns chan struct{}
connectedBlocks <-chan keystore.BlockStamp
disconnectedBlocks <-chan keystore.BlockStamp
newCredits <-chan txstore.Credit
newDebits <-chan txstore.Debits
minedCredits <-chan txstore.Credit
minedDebits <-chan txstore.Debits
keystoreLocked <-chan bool
confirmedBalance <-chan btcutil.Amount
unconfirmedBalance <-chan btcutil.Amount
chainServerConnected <-chan bool
registerWalletNtfns chan struct{}
registerChainSvrNtfns chan struct{}
// enqueueNotification and dequeueNotification handle both sides of an
// infinitly growing queue for websocket client notifications.
@ -310,6 +311,7 @@ func newRPCServer(listenAddrs []string, maxPost, maxWebsockets int64) (*rpcServe
registerWSC: make(chan *websocketClient),
unregisterWSC: make(chan *websocketClient),
registerWalletNtfns: make(chan struct{}),
registerChainSvrNtfns: make(chan struct{}),
enqueueNotification: make(chan wsClientNotification),
dequeueNotification: make(chan wsClientNotification),
notificationHandlerQuit: make(chan struct{}),
@ -514,7 +516,6 @@ func (s *rpcServer) SetWallet(wallet *Wallet) {
s.wallet = wallet
s.registerWalletNtfns <- struct{}{}
chainSvrConnected := false
if s.chainSvr != nil {
// If the chain server rpc client is also set, there's no reason
// to keep the mutex around. Make the locker simply execute
@ -525,14 +526,10 @@ func (s *rpcServer) SetWallet(wallet *Wallet) {
// ok to run.
s.handlerLookup = lookupAnyHandler
chainSvrConnected = !s.chainSvr.Disconnected()
// Make sure already connected websocket clients get a notification
// if the chain RPC client connection is set and connected.
s.chainSvr.NotifyConnected()
}
// Make sure already connected websocket clients get a notification
// if the chain RPC client connection is set and connected. This is
// run as a goroutine since it must acquire the handlerLock, which is
// locked here.
go wallet.notifyChainServerConnected(chainSvrConnected)
}
// SetChainServer sets the chain server client component needed to run a fully
@ -545,6 +542,8 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) {
defer s.handlerLock.Unlock()
s.chainSvr = chainSvr
s.registerChainSvrNtfns <- struct{}{}
if s.wallet != nil {
// If the wallet had already been set, there's no reason to keep
// the mutex around. Make the locker simply execute noops
@ -554,12 +553,6 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) {
// With both the chain server and wallet set, all handlers are
// ok to run.
s.handlerLookup = lookupAnyHandler
// Make sure already connected websocket clients get a
// notification if the chain RPC client connection is set and
// connected. This is run as a goroutine since it must acquire
// the handlerLock, which is locked here.
go s.wallet.notifyChainServerConnected(!chainSvr.Disconnected())
}
}
@ -877,8 +870,8 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) {
// TODO(jrick): this is crappy. kill it.
s.handlerLock.Lock()
if s.wallet != nil && s.chainSvr != nil {
s.wallet.notifyChainServerConnected(!s.chainSvr.Disconnected())
if s.chainSvr != nil {
s.chainSvr.NotifyConnected()
}
s.handlerLock.Unlock()
@ -1137,12 +1130,6 @@ out:
"balance changes: %v", err)
continue
}
chainServerConnected, err := s.wallet.ListenChainServerConnected()
if err != nil {
log.Errorf("Could not register for chain server "+
"connection changes: %v", err)
continue
}
s.connectedBlocks = connectedBlocks
s.disconnectedBlocks = disconnectedBlocks
s.newCredits = newCredits
@ -1152,8 +1139,25 @@ out:
s.keystoreLocked = keystoreLocked
s.confirmedBalance = confirmedBalance
s.unconfirmedBalance = unconfirmedBalance
case <-s.registerChainSvrNtfns:
chainServerConnected, err := s.chainSvr.ListenConnected()
if err != nil {
log.Errorf("Could not register for chain server "+
"connection changes: %v", err)
continue
}
s.chainServerConnected = chainServerConnected
// Make sure already connected websocket clients get a
// notification for the current client connection state.
//
// TODO(jrick): I am appalled by doing this but trying
// not to change how notifications work for the moment.
// A revamped notification API without this horror will
// be implemented soon.
go s.chainSvr.NotifyConnected()
case <-s.quit:
break out
}
@ -1175,6 +1179,7 @@ func (s *rpcServer) drainNotifications() {
case <-s.confirmedBalance:
case <-s.unconfirmedBalance:
case <-s.registerWalletNtfns:
case <-s.registerChainSvrNtfns:
}
}
}

View file

@ -95,13 +95,12 @@ type Wallet struct {
// Notification channels so other components can listen in on wallet
// activity. These are initialized as nil, and must be created by
// calling one of the Listen* methods.
connectedBlocks chan keystore.BlockStamp
disconnectedBlocks chan keystore.BlockStamp
lockStateChanges chan bool // true when locked
confirmedBalance chan btcutil.Amount
unconfirmedBalance chan btcutil.Amount
chainServerConnected chan bool
notificationLock sync.Locker
connectedBlocks chan keystore.BlockStamp
disconnectedBlocks chan keystore.BlockStamp
lockStateChanges chan bool // true when locked
confirmedBalance chan btcutil.Amount
unconfirmedBalance chan btcutil.Amount
notificationLock sync.Locker
wg sync.WaitGroup
quit chan struct{}
@ -148,8 +147,6 @@ func (w *Wallet) updateNotificationLock() {
case w.confirmedBalance == nil:
fallthrough
case w.unconfirmedBalance == nil:
fallthrough
case w.chainServerConnected == nil:
return
}
w.notificationLock = noopLocker{}
@ -241,18 +238,6 @@ func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) {
return w.unconfirmedBalance, nil
}
func (w *Wallet) ListenChainServerConnected() (<-chan bool, error) {
w.notificationLock.Lock()
defer w.notificationLock.Unlock()
if w.chainServerConnected != nil {
return nil, ErrDuplicateListen
}
w.chainServerConnected = make(chan bool)
w.updateNotificationLock()
return w.chainServerConnected, nil
}
func (w *Wallet) notifyConnectedBlock(block keystore.BlockStamp) {
w.notificationLock.Lock()
if w.connectedBlocks != nil {
@ -293,14 +278,6 @@ func (w *Wallet) notifyUnconfirmedBalance(bal btcutil.Amount) {
w.notificationLock.Unlock()
}
func (w *Wallet) notifyChainServerConnected(connected bool) {
w.notificationLock.Lock()
if w.chainServerConnected != nil {
w.chainServerConnected <- connected
}
w.notificationLock.Unlock()
}
// openWallet opens a new wallet from disk.
func openWallet() (*Wallet, error) {
netdir := networkDir(activeNet.Params)
@ -376,8 +353,6 @@ func (w *Wallet) Start(chainServer *chain.Client) {
w.chainSvrLock.Lock()
defer w.chainSvrLock.Unlock()
w.notifyChainServerConnected(!chainServer.Disconnected())
w.chainSvr = chainServer
w.chainSvrLock = noopLocker{}