Remove legacy JSON-RPC notifications.

These notifications were added to support real time updates for
btcgui.  As the btcgui project is no longer being developed, there are
no more consumers of this API, and it makes sense to remove them given
their various issues (the largest being that notifiations are sent
unsubscribed to clients that may never be interrested in them).

A new notification server has already been added to the wallet package
to handle notifications in a RPC-server agnostic way.  This server is
the means by which the wallet notifies changes for gRPC clients.  If
per-client registered notifications are to be re-added for the
JSON-RPC server, they should be integrated with the new notification
server rather than using this legacy code.
This commit is contained in:
Josh Rickmar 2016-03-11 14:14:33 -05:00
parent 9fe02c43ca
commit 6cf22b7944
4 changed files with 2 additions and 541 deletions

View file

@ -19,10 +19,8 @@ import (
"time"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/btcsuite/fastsha256"
"github.com/btcsuite/websocket"
)
@ -74,32 +72,6 @@ type Server struct {
maxPostClients int64 // Max concurrent HTTP POST clients.
maxWebsocketClients int64 // Max concurrent websocket clients.
// Channels to register or unregister a websocket client for
// websocket notifications.
registerWSC chan *websocketClient
unregisterWSC chan *websocketClient
// Channels read from other components from which notifications are
// created.
connectedBlocks <-chan wtxmgr.BlockMeta
disconnectedBlocks <-chan wtxmgr.BlockMeta
relevantTxs <-chan chain.RelevantTx
managerLocked <-chan bool
confirmedBalance <-chan btcutil.Amount
unconfirmedBalance <-chan btcutil.Amount
//chainServerConnected <-chan bool
registerWalletNtfns chan struct{}
// enqueueNotification and dequeueNotification handle both sides of an
// infinitly growing queue for websocket client notifications.
enqueueNotification chan wsClientNotification
dequeueNotification chan wsClientNotification
// notificationHandlerQuit is closed when the notification handler
// goroutine shuts down. After this is closed, no more notifications
// will be sent to any websocket client response channel.
notificationHandlerQuit chan struct{}
wg sync.WaitGroup
quit chan struct{}
quitMtx sync.Mutex
@ -138,12 +110,6 @@ func NewServer(opts *Options, walletLoader *wallet.Loader, listeners []net.Liste
// Allow all origins.
CheckOrigin: func(r *http.Request) bool { return true },
},
registerWSC: make(chan *websocketClient),
unregisterWSC: make(chan *websocketClient),
registerWalletNtfns: make(chan struct{}),
enqueueNotification: make(chan wsClientNotification),
dequeueNotification: make(chan wsClientNotification),
notificationHandlerQuit: make(chan struct{}),
quit: make(chan struct{}),
requestShutdownChan: make(chan struct{}, 1),
}
@ -191,11 +157,6 @@ func NewServer(opts *Options, walletLoader *wallet.Loader, listeners []net.Liste
server.websocketClientRPC(wsc)
}))
server.wg.Add(3)
go server.notificationListener()
go server.notificationQueue()
go server.notificationHandler()
for _, lis := range listeners {
server.serve(lis)
}
@ -240,7 +201,6 @@ func (s *Server) serve(lis net.Listener) {
func (s *Server) RegisterWallet(w *wallet.Wallet) {
s.handlerMu.Lock()
s.wallet = w
s.registerWalletNtfns <- struct{}{}
s.handlerMu.Unlock()
}
@ -536,16 +496,6 @@ out:
}
}
// Remove websocket client from notification group, or if the server is
// shutting down, wait until the notification handler has finished
// running. This is needed to ensure that no more notifications will be
// sent to the client's responses chan before it's closed below.
select {
case s.unregisterWSC <- wsc:
case <-s.quit:
<-s.notificationHandlerQuit
}
// allow client to disconnect after all handler goroutines are done
wsc.wg.Wait()
close(wsc.responses)
@ -584,8 +534,8 @@ out:
s.wg.Done()
}
// websocketClientRPC starts the goroutines to serve JSON-RPC requests and
// notifications over a websocket connection for a single client.
// websocketClientRPC starts the goroutines to serve JSON-RPC requests over a
// websocket connection for a single client.
func (s *Server) websocketClientRPC(wsc *websocketClient) {
log.Infof("New websocket client %s", wsc.remoteAddr)
@ -595,14 +545,6 @@ func (s *Server) websocketClientRPC(wsc *websocketClient) {
log.Warnf("Cannot remove read deadline: %v", err)
}
// Add client context so notifications duplicated to each
// client are received by this client.
select {
case s.registerWSC <- wsc:
case <-s.quit:
return
}
// WebsocketClientRead is intentionally not run with the waitgroup
// so it is ignored during shutdown. This is to prevent a hang during
// shutdown where the goroutine is blocked on a read of the
@ -698,262 +640,3 @@ func (s *Server) requestProcessShutdown() {
func (s *Server) RequestProcessShutdown() <-chan struct{} {
return s.requestShutdownChan
}
// Notification messages for websocket clients.
type (
wsClientNotification interface {
// This returns a slice only because some of these types result
// in multpile client notifications.
notificationCmds(w *wallet.Wallet) []interface{}
}
blockConnected wtxmgr.BlockMeta
blockDisconnected wtxmgr.BlockMeta
relevantTx chain.RelevantTx
managerLocked bool
confirmedBalance btcutil.Amount
unconfirmedBalance btcutil.Amount
btcdConnected bool
)
func (b blockConnected) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewBlockConnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix())
return []interface{}{n}
}
func (b blockDisconnected) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewBlockDisconnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix())
return []interface{}{n}
}
func (t relevantTx) notificationCmds(w *wallet.Wallet) []interface{} {
syncBlock := w.Manager.SyncedTo()
var block *wtxmgr.Block
if t.Block != nil {
block = &t.Block.Block
}
details, err := w.TxStore.UniqueTxDetails(&t.TxRecord.Hash, block)
if err != nil {
log.Errorf("Cannot fetch transaction details for "+
"client notification: %v", err)
return nil
}
if details == nil {
log.Errorf("No details found for client transaction notification")
return nil
}
ltr := wallet.ListTransactions(details, w.Manager, syncBlock.Height,
w.ChainParams())
ntfns := make([]interface{}, len(ltr))
for i := range ntfns {
ntfns[i] = btcjson.NewNewTxNtfn(ltr[i].Account, ltr[i])
}
return ntfns
}
func (l managerLocked) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewWalletLockStateNtfn(bool(l))
return []interface{}{n}
}
func (b confirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewAccountBalanceNtfn("",
btcutil.Amount(b).ToBTC(), true)
return []interface{}{n}
}
func (b unconfirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewAccountBalanceNtfn("",
btcutil.Amount(b).ToBTC(), false)
return []interface{}{n}
}
func (b btcdConnected) notificationCmds(w *wallet.Wallet) []interface{} {
n := btcjson.NewBtcdConnectedNtfn(bool(b))
return []interface{}{n}
}
func (s *Server) notificationListener() {
out:
for {
select {
case n := <-s.connectedBlocks:
s.enqueueNotification <- blockConnected(n)
case n := <-s.disconnectedBlocks:
s.enqueueNotification <- blockDisconnected(n)
case n := <-s.relevantTxs:
s.enqueueNotification <- relevantTx(n)
case n := <-s.managerLocked:
s.enqueueNotification <- managerLocked(n)
case n := <-s.confirmedBalance:
s.enqueueNotification <- confirmedBalance(n)
case n := <-s.unconfirmedBalance:
s.enqueueNotification <- unconfirmedBalance(n)
// Registration of all notifications is done by the handler so
// it doesn't require another Server mutex.
case <-s.registerWalletNtfns:
connectedBlocks, err := s.wallet.ListenConnectedBlocks()
if err != nil {
log.Errorf("Could not register for new "+
"connected block notifications: %v",
err)
continue
}
disconnectedBlocks, err := s.wallet.ListenDisconnectedBlocks()
if err != nil {
log.Errorf("Could not register for new "+
"disconnected block notifications: %v",
err)
continue
}
relevantTxs, err := s.wallet.ListenRelevantTxs()
if err != nil {
log.Errorf("Could not register for new relevant "+
"transaction notifications: %v", err)
continue
}
managerLocked, err := s.wallet.ListenLockStatus()
if err != nil {
log.Errorf("Could not register for manager "+
"lock state changes: %v", err)
continue
}
confirmedBalance, err := s.wallet.ListenConfirmedBalance()
if err != nil {
log.Errorf("Could not register for confirmed "+
"balance changes: %v", err)
continue
}
unconfirmedBalance, err := s.wallet.ListenUnconfirmedBalance()
if err != nil {
log.Errorf("Could not register for unconfirmed "+
"balance changes: %v", err)
continue
}
s.connectedBlocks = connectedBlocks
s.disconnectedBlocks = disconnectedBlocks
s.relevantTxs = relevantTxs
s.managerLocked = managerLocked
s.confirmedBalance = confirmedBalance
s.unconfirmedBalance = unconfirmedBalance
case <-s.quit:
break out
}
}
close(s.enqueueNotification)
go s.drainNotifications()
s.wg.Done()
}
func (s *Server) drainNotifications() {
for {
select {
case <-s.connectedBlocks:
case <-s.disconnectedBlocks:
case <-s.relevantTxs:
case <-s.managerLocked:
case <-s.confirmedBalance:
case <-s.unconfirmedBalance:
case <-s.registerWalletNtfns:
}
}
}
// notificationQueue manages an infinitly-growing queue of notifications that
// wallet websocket clients may be interested in. It quits when the
// enqueueNotification channel is closed, dropping any still pending
// notifications.
func (s *Server) notificationQueue() {
var q []wsClientNotification
var dequeue chan<- wsClientNotification
skipQueue := s.dequeueNotification
var next wsClientNotification
out:
for {
select {
case n, ok := <-s.enqueueNotification:
if !ok {
// Sender closed input channel.
break out
}
// Either send to out immediately if skipQueue is
// non-nil (queue is empty) and reader is ready,
// or append to the queue and send later.
select {
case skipQueue <- n:
default:
q = append(q, n)
dequeue = s.dequeueNotification
skipQueue = nil
next = q[0]
}
case dequeue <- next:
q[0] = nil // avoid leak
q = q[1:]
if len(q) == 0 {
dequeue = nil
skipQueue = s.dequeueNotification
} else {
next = q[0]
}
}
}
close(s.dequeueNotification)
s.wg.Done()
}
func (s *Server) notificationHandler() {
clients := make(map[chan struct{}]*websocketClient)
out:
for {
select {
case c := <-s.registerWSC:
clients[c.quit] = c
case c := <-s.unregisterWSC:
delete(clients, c.quit)
case nmsg, ok := <-s.dequeueNotification:
// No more notifications.
if !ok {
break out
}
// Ignore if there are no clients to receive the
// notification.
if len(clients) == 0 {
continue
}
ns := nmsg.notificationCmds(s.wallet)
for _, n := range ns {
mn, err := btcjson.MarshalCmd(nil, n)
// All notifications are expected to be
// marshalable.
if err != nil {
panic(err)
}
for _, c := range clients {
if err := c.send(mn); err != nil {
delete(clients, c.quit)
}
}
}
case <-s.quit:
break out
}
}
close(s.notificationHandlerQuit)
s.wg.Done()
}

View file

@ -71,10 +71,6 @@ func (w *Wallet) connectBlock(b wtxmgr.BlockMeta) {
// Notify interested clients of the connected block.
w.NtfnServer.notifyAttachedBlock(&b)
// Legacy JSON-RPC notifications
w.notifyConnectedBlock(b)
w.notifyBalances(b.Height)
}
// disconnectBlock handles a chain server reorganize by rolling back all
@ -114,10 +110,6 @@ func (w *Wallet) disconnectBlock(b wtxmgr.BlockMeta) error {
// Notify interested clients of the disconnected block.
w.NtfnServer.notifyDetachedBlock(&b.Hash)
// Legacy JSON-RPC notifications
w.notifyDisconnectedBlock(b)
w.notifyBalances(b.Height - 1)
return nil
}
@ -220,38 +212,5 @@ func (w *Wallet) addRelevantTx(rec *wtxmgr.TxRecord, block *wtxmgr.BlockMeta) er
}
}
// Legacy JSON-RPC notifications
//
// TODO: Synced-to information should be handled by the wallet, not the
// RPC client.
chainClient, err := w.requireChainClient()
if err == nil {
bs, err := chainClient.BlockStamp()
if err == nil {
w.notifyBalances(bs.Height)
}
}
return nil
}
func (w *Wallet) notifyBalances(curHeight int32) {
// Don't notify unless wallet is synced to the chain server.
if !w.ChainSynced() {
return
}
// Notify any potential changes to the balance.
confirmed, err := w.TxStore.Balance(1, curHeight)
if err != nil {
log.Errorf("Cannot determine 1-conf balance: %v", err)
return
}
w.notifyConfirmedBalance(confirmed)
unconfirmed, err := w.TxStore.Balance(0, curHeight)
if err != nil {
log.Errorf("Cannot determine 0-conf balance: %v", err)
return
}
w.notifyUnconfirmedBalance(unconfirmed - confirmed)
}

View file

@ -201,20 +201,6 @@ out:
go w.ResendUnminedTxs()
// TODO(jrick): The current websocket API requires
// notifying the block the rescan synced through to
// every connected client. This is code smell and
// should be removed or replaced with a more
// appropiate notification when the API is redone.
b := wtxmgr.BlockMeta{
Block: wtxmgr.Block{
*n.Hash,
n.Height,
},
Time: n.Time,
}
w.notifyConnectedBlock(b)
case <-quit:
break out
}

View file

@ -99,20 +99,6 @@ type Wallet struct {
NtfnServer *NotificationServer
// Legacy notification channels so other components can listen in on
// wallet activity. These are initialized as nil, and must be created
// by calling one of the Listen* methods.
//
// These channels and the features needed by them are on a fast path to
// deletion. Use the server instead.
connectedBlocks chan wtxmgr.BlockMeta
disconnectedBlocks chan wtxmgr.BlockMeta
relevantTxs chan chain.RelevantTx
lockStateChanges chan bool // true when locked
confirmedBalance chan btcutil.Amount
unconfirmedBalance chan btcutil.Amount
notificationMu sync.Mutex
chainParams *chaincfg.Params
wg sync.WaitGroup
@ -121,156 +107,6 @@ type Wallet struct {
quitMu sync.Mutex
}
// ErrDuplicateListen is returned for any attempts to listen for the same
// notification more than once. If callers must pass along a notifiation to
// multiple places, they must broadcast it themself.
var ErrDuplicateListen = errors.New("duplicate listen")
// ListenConnectedBlocks returns a channel that passes all blocks that a wallet
// has been marked in sync with. The channel must be read, or other wallet
// methods will block.
//
// If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenConnectedBlocks() (<-chan wtxmgr.BlockMeta, error) {
defer w.notificationMu.Unlock()
w.notificationMu.Lock()
if w.connectedBlocks != nil {
return nil, ErrDuplicateListen
}
w.connectedBlocks = make(chan wtxmgr.BlockMeta)
return w.connectedBlocks, nil
}
// ListenDisconnectedBlocks returns a channel that passes all blocks that a
// wallet has detached. The channel must be read, or other wallet methods will
// block.
//
// If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenDisconnectedBlocks() (<-chan wtxmgr.BlockMeta, error) {
defer w.notificationMu.Unlock()
w.notificationMu.Lock()
if w.disconnectedBlocks != nil {
return nil, ErrDuplicateListen
}
w.disconnectedBlocks = make(chan wtxmgr.BlockMeta)
return w.disconnectedBlocks, nil
}
// ListenLockStatus returns a channel that passes the current lock state
// of the wallet whenever the lock state is changed. The value is true for
// locked, and false for unlocked. The channel must be read, or other wallet
// methods will block.
//
// If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenLockStatus() (<-chan bool, error) {
defer w.notificationMu.Unlock()
w.notificationMu.Lock()
if w.lockStateChanges != nil {
return nil, ErrDuplicateListen
}
w.lockStateChanges = make(chan bool)
return w.lockStateChanges, nil
}
// ListenConfirmedBalance returns a channel that passes the confirmed balance
// when any changes to the balance are made. This channel must be read, or
// other wallet methods will block.
//
// If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenConfirmedBalance() (<-chan btcutil.Amount, error) {
defer w.notificationMu.Unlock()
w.notificationMu.Lock()
if w.confirmedBalance != nil {
return nil, ErrDuplicateListen
}
w.confirmedBalance = make(chan btcutil.Amount)
return w.confirmedBalance, nil
}
// ListenUnconfirmedBalance returns a channel that passes the unconfirmed
// balance when any changes to the balance are made. This channel must be
// read, or other wallet methods will block.
//
// If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) {
defer w.notificationMu.Unlock()
w.notificationMu.Lock()
if w.unconfirmedBalance != nil {
return nil, ErrDuplicateListen
}
w.unconfirmedBalance = make(chan btcutil.Amount)
return w.unconfirmedBalance, nil
}
// ListenRelevantTxs returns a channel that passes all transactions relevant to
// a wallet, optionally including metadata regarding the block they were mined
// in. This channel must be read, or other wallet methods will block.
//
// If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenRelevantTxs() (<-chan chain.RelevantTx, error) {
defer w.notificationMu.Unlock()
w.notificationMu.Lock()
if w.relevantTxs != nil {
return nil, ErrDuplicateListen
}
w.relevantTxs = make(chan chain.RelevantTx)
return w.relevantTxs, nil
}
func (w *Wallet) notifyConnectedBlock(block wtxmgr.BlockMeta) {
w.notificationMu.Lock()
if w.connectedBlocks != nil {
w.connectedBlocks <- block
}
w.notificationMu.Unlock()
}
func (w *Wallet) notifyDisconnectedBlock(block wtxmgr.BlockMeta) {
w.notificationMu.Lock()
if w.disconnectedBlocks != nil {
w.disconnectedBlocks <- block
}
w.notificationMu.Unlock()
}
func (w *Wallet) notifyLockStateChange(locked bool) {
w.notificationMu.Lock()
if w.lockStateChanges != nil {
w.lockStateChanges <- locked
}
w.notificationMu.Unlock()
}
func (w *Wallet) notifyConfirmedBalance(bal btcutil.Amount) {
w.notificationMu.Lock()
if w.confirmedBalance != nil {
w.confirmedBalance <- bal
}
w.notificationMu.Unlock()
}
func (w *Wallet) notifyUnconfirmedBalance(bal btcutil.Amount) {
w.notificationMu.Lock()
if w.unconfirmedBalance != nil {
w.unconfirmedBalance <- bal
}
w.notificationMu.Unlock()
}
func (w *Wallet) notifyRelevantTx(relevantTx chain.RelevantTx) {
w.notificationMu.Lock()
if w.relevantTxs != nil {
w.relevantTxs <- relevantTx
}
w.notificationMu.Unlock()
}
// Start starts the goroutines necessary to manage a wallet.
func (w *Wallet) Start() {
w.quitMu.Lock()
@ -661,7 +497,6 @@ out:
req.err <- err
continue
}
w.notifyLockStateChange(false)
timeout = req.lockAfter
req.err <- nil
continue
@ -709,8 +544,6 @@ out:
err := w.Manager.Lock()
if err != nil && !waddrmgr.IsError(err, waddrmgr.ErrLocked) {
log.Errorf("Could not lock wallet: %v", err)
} else {
w.notifyLockStateChange(true)
}
}
w.wg.Done()