diff --git a/rpc/legacyrpc/server.go b/rpc/legacyrpc/server.go index 9118aa8..983e1f8 100644 --- a/rpc/legacyrpc/server.go +++ b/rpc/legacyrpc/server.go @@ -19,10 +19,8 @@ import ( "time" "github.com/btcsuite/btcd/btcjson" - "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/wallet" - "github.com/btcsuite/btcwallet/wtxmgr" "github.com/btcsuite/fastsha256" "github.com/btcsuite/websocket" ) @@ -74,32 +72,6 @@ type Server struct { maxPostClients int64 // Max concurrent HTTP POST clients. maxWebsocketClients int64 // Max concurrent websocket clients. - // Channels to register or unregister a websocket client for - // websocket notifications. - registerWSC chan *websocketClient - unregisterWSC chan *websocketClient - - // Channels read from other components from which notifications are - // created. - connectedBlocks <-chan wtxmgr.BlockMeta - disconnectedBlocks <-chan wtxmgr.BlockMeta - relevantTxs <-chan chain.RelevantTx - managerLocked <-chan bool - confirmedBalance <-chan btcutil.Amount - unconfirmedBalance <-chan btcutil.Amount - //chainServerConnected <-chan bool - registerWalletNtfns chan struct{} - - // enqueueNotification and dequeueNotification handle both sides of an - // infinitly growing queue for websocket client notifications. - enqueueNotification chan wsClientNotification - dequeueNotification chan wsClientNotification - - // notificationHandlerQuit is closed when the notification handler - // goroutine shuts down. After this is closed, no more notifications - // will be sent to any websocket client response channel. - notificationHandlerQuit chan struct{} - wg sync.WaitGroup quit chan struct{} quitMtx sync.Mutex @@ -138,12 +110,6 @@ func NewServer(opts *Options, walletLoader *wallet.Loader, listeners []net.Liste // Allow all origins. CheckOrigin: func(r *http.Request) bool { return true }, }, - registerWSC: make(chan *websocketClient), - unregisterWSC: make(chan *websocketClient), - registerWalletNtfns: make(chan struct{}), - enqueueNotification: make(chan wsClientNotification), - dequeueNotification: make(chan wsClientNotification), - notificationHandlerQuit: make(chan struct{}), quit: make(chan struct{}), requestShutdownChan: make(chan struct{}, 1), } @@ -191,11 +157,6 @@ func NewServer(opts *Options, walletLoader *wallet.Loader, listeners []net.Liste server.websocketClientRPC(wsc) })) - server.wg.Add(3) - go server.notificationListener() - go server.notificationQueue() - go server.notificationHandler() - for _, lis := range listeners { server.serve(lis) } @@ -240,7 +201,6 @@ func (s *Server) serve(lis net.Listener) { func (s *Server) RegisterWallet(w *wallet.Wallet) { s.handlerMu.Lock() s.wallet = w - s.registerWalletNtfns <- struct{}{} s.handlerMu.Unlock() } @@ -536,16 +496,6 @@ out: } } - // Remove websocket client from notification group, or if the server is - // shutting down, wait until the notification handler has finished - // running. This is needed to ensure that no more notifications will be - // sent to the client's responses chan before it's closed below. - select { - case s.unregisterWSC <- wsc: - case <-s.quit: - <-s.notificationHandlerQuit - } - // allow client to disconnect after all handler goroutines are done wsc.wg.Wait() close(wsc.responses) @@ -584,8 +534,8 @@ out: s.wg.Done() } -// websocketClientRPC starts the goroutines to serve JSON-RPC requests and -// notifications over a websocket connection for a single client. +// websocketClientRPC starts the goroutines to serve JSON-RPC requests over a +// websocket connection for a single client. func (s *Server) websocketClientRPC(wsc *websocketClient) { log.Infof("New websocket client %s", wsc.remoteAddr) @@ -595,14 +545,6 @@ func (s *Server) websocketClientRPC(wsc *websocketClient) { log.Warnf("Cannot remove read deadline: %v", err) } - // Add client context so notifications duplicated to each - // client are received by this client. - select { - case s.registerWSC <- wsc: - case <-s.quit: - return - } - // WebsocketClientRead is intentionally not run with the waitgroup // so it is ignored during shutdown. This is to prevent a hang during // shutdown where the goroutine is blocked on a read of the @@ -698,262 +640,3 @@ func (s *Server) requestProcessShutdown() { func (s *Server) RequestProcessShutdown() <-chan struct{} { return s.requestShutdownChan } - -// Notification messages for websocket clients. -type ( - wsClientNotification interface { - // This returns a slice only because some of these types result - // in multpile client notifications. - notificationCmds(w *wallet.Wallet) []interface{} - } - - blockConnected wtxmgr.BlockMeta - blockDisconnected wtxmgr.BlockMeta - - relevantTx chain.RelevantTx - - managerLocked bool - - confirmedBalance btcutil.Amount - unconfirmedBalance btcutil.Amount - - btcdConnected bool -) - -func (b blockConnected) notificationCmds(w *wallet.Wallet) []interface{} { - n := btcjson.NewBlockConnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix()) - return []interface{}{n} -} - -func (b blockDisconnected) notificationCmds(w *wallet.Wallet) []interface{} { - n := btcjson.NewBlockDisconnectedNtfn(b.Hash.String(), b.Height, b.Time.Unix()) - return []interface{}{n} -} - -func (t relevantTx) notificationCmds(w *wallet.Wallet) []interface{} { - syncBlock := w.Manager.SyncedTo() - - var block *wtxmgr.Block - if t.Block != nil { - block = &t.Block.Block - } - details, err := w.TxStore.UniqueTxDetails(&t.TxRecord.Hash, block) - if err != nil { - log.Errorf("Cannot fetch transaction details for "+ - "client notification: %v", err) - return nil - } - if details == nil { - log.Errorf("No details found for client transaction notification") - return nil - } - - ltr := wallet.ListTransactions(details, w.Manager, syncBlock.Height, - w.ChainParams()) - ntfns := make([]interface{}, len(ltr)) - for i := range ntfns { - ntfns[i] = btcjson.NewNewTxNtfn(ltr[i].Account, ltr[i]) - } - return ntfns -} - -func (l managerLocked) notificationCmds(w *wallet.Wallet) []interface{} { - n := btcjson.NewWalletLockStateNtfn(bool(l)) - return []interface{}{n} -} - -func (b confirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} { - n := btcjson.NewAccountBalanceNtfn("", - btcutil.Amount(b).ToBTC(), true) - return []interface{}{n} -} - -func (b unconfirmedBalance) notificationCmds(w *wallet.Wallet) []interface{} { - n := btcjson.NewAccountBalanceNtfn("", - btcutil.Amount(b).ToBTC(), false) - return []interface{}{n} -} - -func (b btcdConnected) notificationCmds(w *wallet.Wallet) []interface{} { - n := btcjson.NewBtcdConnectedNtfn(bool(b)) - return []interface{}{n} -} - -func (s *Server) notificationListener() { -out: - for { - select { - case n := <-s.connectedBlocks: - s.enqueueNotification <- blockConnected(n) - case n := <-s.disconnectedBlocks: - s.enqueueNotification <- blockDisconnected(n) - case n := <-s.relevantTxs: - s.enqueueNotification <- relevantTx(n) - case n := <-s.managerLocked: - s.enqueueNotification <- managerLocked(n) - case n := <-s.confirmedBalance: - s.enqueueNotification <- confirmedBalance(n) - case n := <-s.unconfirmedBalance: - s.enqueueNotification <- unconfirmedBalance(n) - - // Registration of all notifications is done by the handler so - // it doesn't require another Server mutex. - case <-s.registerWalletNtfns: - connectedBlocks, err := s.wallet.ListenConnectedBlocks() - if err != nil { - log.Errorf("Could not register for new "+ - "connected block notifications: %v", - err) - continue - } - disconnectedBlocks, err := s.wallet.ListenDisconnectedBlocks() - if err != nil { - log.Errorf("Could not register for new "+ - "disconnected block notifications: %v", - err) - continue - } - relevantTxs, err := s.wallet.ListenRelevantTxs() - if err != nil { - log.Errorf("Could not register for new relevant "+ - "transaction notifications: %v", err) - continue - } - managerLocked, err := s.wallet.ListenLockStatus() - if err != nil { - log.Errorf("Could not register for manager "+ - "lock state changes: %v", err) - continue - } - confirmedBalance, err := s.wallet.ListenConfirmedBalance() - if err != nil { - log.Errorf("Could not register for confirmed "+ - "balance changes: %v", err) - continue - } - unconfirmedBalance, err := s.wallet.ListenUnconfirmedBalance() - if err != nil { - log.Errorf("Could not register for unconfirmed "+ - "balance changes: %v", err) - continue - } - s.connectedBlocks = connectedBlocks - s.disconnectedBlocks = disconnectedBlocks - s.relevantTxs = relevantTxs - s.managerLocked = managerLocked - s.confirmedBalance = confirmedBalance - s.unconfirmedBalance = unconfirmedBalance - - case <-s.quit: - break out - } - } - close(s.enqueueNotification) - go s.drainNotifications() - s.wg.Done() -} - -func (s *Server) drainNotifications() { - for { - select { - case <-s.connectedBlocks: - case <-s.disconnectedBlocks: - case <-s.relevantTxs: - case <-s.managerLocked: - case <-s.confirmedBalance: - case <-s.unconfirmedBalance: - case <-s.registerWalletNtfns: - } - } -} - -// notificationQueue manages an infinitly-growing queue of notifications that -// wallet websocket clients may be interested in. It quits when the -// enqueueNotification channel is closed, dropping any still pending -// notifications. -func (s *Server) notificationQueue() { - var q []wsClientNotification - var dequeue chan<- wsClientNotification - skipQueue := s.dequeueNotification - var next wsClientNotification -out: - for { - select { - case n, ok := <-s.enqueueNotification: - if !ok { - // Sender closed input channel. - break out - } - - // Either send to out immediately if skipQueue is - // non-nil (queue is empty) and reader is ready, - // or append to the queue and send later. - select { - case skipQueue <- n: - default: - q = append(q, n) - dequeue = s.dequeueNotification - skipQueue = nil - next = q[0] - } - - case dequeue <- next: - q[0] = nil // avoid leak - q = q[1:] - if len(q) == 0 { - dequeue = nil - skipQueue = s.dequeueNotification - } else { - next = q[0] - } - } - } - close(s.dequeueNotification) - s.wg.Done() -} - -func (s *Server) notificationHandler() { - clients := make(map[chan struct{}]*websocketClient) -out: - for { - select { - case c := <-s.registerWSC: - clients[c.quit] = c - - case c := <-s.unregisterWSC: - delete(clients, c.quit) - - case nmsg, ok := <-s.dequeueNotification: - // No more notifications. - if !ok { - break out - } - - // Ignore if there are no clients to receive the - // notification. - if len(clients) == 0 { - continue - } - - ns := nmsg.notificationCmds(s.wallet) - for _, n := range ns { - mn, err := btcjson.MarshalCmd(nil, n) - // All notifications are expected to be - // marshalable. - if err != nil { - panic(err) - } - for _, c := range clients { - if err := c.send(mn); err != nil { - delete(clients, c.quit) - } - } - } - - case <-s.quit: - break out - } - } - close(s.notificationHandlerQuit) - s.wg.Done() -} diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index 82c63a0..9a0f1c8 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -71,10 +71,6 @@ func (w *Wallet) connectBlock(b wtxmgr.BlockMeta) { // Notify interested clients of the connected block. w.NtfnServer.notifyAttachedBlock(&b) - - // Legacy JSON-RPC notifications - w.notifyConnectedBlock(b) - w.notifyBalances(b.Height) } // disconnectBlock handles a chain server reorganize by rolling back all @@ -114,10 +110,6 @@ func (w *Wallet) disconnectBlock(b wtxmgr.BlockMeta) error { // Notify interested clients of the disconnected block. w.NtfnServer.notifyDetachedBlock(&b.Hash) - // Legacy JSON-RPC notifications - w.notifyDisconnectedBlock(b) - w.notifyBalances(b.Height - 1) - return nil } @@ -220,38 +212,5 @@ func (w *Wallet) addRelevantTx(rec *wtxmgr.TxRecord, block *wtxmgr.BlockMeta) er } } - // Legacy JSON-RPC notifications - // - // TODO: Synced-to information should be handled by the wallet, not the - // RPC client. - chainClient, err := w.requireChainClient() - if err == nil { - bs, err := chainClient.BlockStamp() - if err == nil { - w.notifyBalances(bs.Height) - } - } - return nil } - -func (w *Wallet) notifyBalances(curHeight int32) { - // Don't notify unless wallet is synced to the chain server. - if !w.ChainSynced() { - return - } - - // Notify any potential changes to the balance. - confirmed, err := w.TxStore.Balance(1, curHeight) - if err != nil { - log.Errorf("Cannot determine 1-conf balance: %v", err) - return - } - w.notifyConfirmedBalance(confirmed) - unconfirmed, err := w.TxStore.Balance(0, curHeight) - if err != nil { - log.Errorf("Cannot determine 0-conf balance: %v", err) - return - } - w.notifyUnconfirmedBalance(unconfirmed - confirmed) -} diff --git a/wallet/rescan.go b/wallet/rescan.go index 2a03699..79c99e9 100644 --- a/wallet/rescan.go +++ b/wallet/rescan.go @@ -201,20 +201,6 @@ out: go w.ResendUnminedTxs() - // TODO(jrick): The current websocket API requires - // notifying the block the rescan synced through to - // every connected client. This is code smell and - // should be removed or replaced with a more - // appropiate notification when the API is redone. - b := wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - *n.Hash, - n.Height, - }, - Time: n.Time, - } - w.notifyConnectedBlock(b) - case <-quit: break out } diff --git a/wallet/wallet.go b/wallet/wallet.go index 98cdcea..e23bf23 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -99,20 +99,6 @@ type Wallet struct { NtfnServer *NotificationServer - // Legacy notification channels so other components can listen in on - // wallet activity. These are initialized as nil, and must be created - // by calling one of the Listen* methods. - // - // These channels and the features needed by them are on a fast path to - // deletion. Use the server instead. - connectedBlocks chan wtxmgr.BlockMeta - disconnectedBlocks chan wtxmgr.BlockMeta - relevantTxs chan chain.RelevantTx - lockStateChanges chan bool // true when locked - confirmedBalance chan btcutil.Amount - unconfirmedBalance chan btcutil.Amount - notificationMu sync.Mutex - chainParams *chaincfg.Params wg sync.WaitGroup @@ -121,156 +107,6 @@ type Wallet struct { quitMu sync.Mutex } -// ErrDuplicateListen is returned for any attempts to listen for the same -// notification more than once. If callers must pass along a notifiation to -// multiple places, they must broadcast it themself. -var ErrDuplicateListen = errors.New("duplicate listen") - -// ListenConnectedBlocks returns a channel that passes all blocks that a wallet -// has been marked in sync with. The channel must be read, or other wallet -// methods will block. -// -// If this is called twice, ErrDuplicateListen is returned. -func (w *Wallet) ListenConnectedBlocks() (<-chan wtxmgr.BlockMeta, error) { - defer w.notificationMu.Unlock() - w.notificationMu.Lock() - - if w.connectedBlocks != nil { - return nil, ErrDuplicateListen - } - w.connectedBlocks = make(chan wtxmgr.BlockMeta) - return w.connectedBlocks, nil -} - -// ListenDisconnectedBlocks returns a channel that passes all blocks that a -// wallet has detached. The channel must be read, or other wallet methods will -// block. -// -// If this is called twice, ErrDuplicateListen is returned. -func (w *Wallet) ListenDisconnectedBlocks() (<-chan wtxmgr.BlockMeta, error) { - defer w.notificationMu.Unlock() - w.notificationMu.Lock() - - if w.disconnectedBlocks != nil { - return nil, ErrDuplicateListen - } - w.disconnectedBlocks = make(chan wtxmgr.BlockMeta) - return w.disconnectedBlocks, nil -} - -// ListenLockStatus returns a channel that passes the current lock state -// of the wallet whenever the lock state is changed. The value is true for -// locked, and false for unlocked. The channel must be read, or other wallet -// methods will block. -// -// If this is called twice, ErrDuplicateListen is returned. -func (w *Wallet) ListenLockStatus() (<-chan bool, error) { - defer w.notificationMu.Unlock() - w.notificationMu.Lock() - - if w.lockStateChanges != nil { - return nil, ErrDuplicateListen - } - w.lockStateChanges = make(chan bool) - return w.lockStateChanges, nil -} - -// ListenConfirmedBalance returns a channel that passes the confirmed balance -// when any changes to the balance are made. This channel must be read, or -// other wallet methods will block. -// -// If this is called twice, ErrDuplicateListen is returned. -func (w *Wallet) ListenConfirmedBalance() (<-chan btcutil.Amount, error) { - defer w.notificationMu.Unlock() - w.notificationMu.Lock() - - if w.confirmedBalance != nil { - return nil, ErrDuplicateListen - } - w.confirmedBalance = make(chan btcutil.Amount) - return w.confirmedBalance, nil -} - -// ListenUnconfirmedBalance returns a channel that passes the unconfirmed -// balance when any changes to the balance are made. This channel must be -// read, or other wallet methods will block. -// -// If this is called twice, ErrDuplicateListen is returned. -func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) { - defer w.notificationMu.Unlock() - w.notificationMu.Lock() - - if w.unconfirmedBalance != nil { - return nil, ErrDuplicateListen - } - w.unconfirmedBalance = make(chan btcutil.Amount) - return w.unconfirmedBalance, nil -} - -// ListenRelevantTxs returns a channel that passes all transactions relevant to -// a wallet, optionally including metadata regarding the block they were mined -// in. This channel must be read, or other wallet methods will block. -// -// If this is called twice, ErrDuplicateListen is returned. -func (w *Wallet) ListenRelevantTxs() (<-chan chain.RelevantTx, error) { - defer w.notificationMu.Unlock() - w.notificationMu.Lock() - - if w.relevantTxs != nil { - return nil, ErrDuplicateListen - } - w.relevantTxs = make(chan chain.RelevantTx) - return w.relevantTxs, nil -} - -func (w *Wallet) notifyConnectedBlock(block wtxmgr.BlockMeta) { - w.notificationMu.Lock() - if w.connectedBlocks != nil { - w.connectedBlocks <- block - } - w.notificationMu.Unlock() -} - -func (w *Wallet) notifyDisconnectedBlock(block wtxmgr.BlockMeta) { - w.notificationMu.Lock() - if w.disconnectedBlocks != nil { - w.disconnectedBlocks <- block - } - w.notificationMu.Unlock() -} - -func (w *Wallet) notifyLockStateChange(locked bool) { - w.notificationMu.Lock() - if w.lockStateChanges != nil { - w.lockStateChanges <- locked - } - w.notificationMu.Unlock() -} - -func (w *Wallet) notifyConfirmedBalance(bal btcutil.Amount) { - w.notificationMu.Lock() - if w.confirmedBalance != nil { - w.confirmedBalance <- bal - } - w.notificationMu.Unlock() -} - -func (w *Wallet) notifyUnconfirmedBalance(bal btcutil.Amount) { - w.notificationMu.Lock() - if w.unconfirmedBalance != nil { - w.unconfirmedBalance <- bal - } - w.notificationMu.Unlock() -} - -func (w *Wallet) notifyRelevantTx(relevantTx chain.RelevantTx) { - w.notificationMu.Lock() - if w.relevantTxs != nil { - w.relevantTxs <- relevantTx - } - w.notificationMu.Unlock() -} - // Start starts the goroutines necessary to manage a wallet. func (w *Wallet) Start() { w.quitMu.Lock() @@ -661,7 +497,6 @@ out: req.err <- err continue } - w.notifyLockStateChange(false) timeout = req.lockAfter req.err <- nil continue @@ -709,8 +544,6 @@ out: err := w.Manager.Lock() if err != nil && !waddrmgr.IsError(err, waddrmgr.ErrLocked) { log.Errorf("Could not lock wallet: %v", err) - } else { - w.notifyLockStateChange(true) } } w.wg.Done()