diff --git a/rpcserver.go b/rpcserver.go index 54459f96..d9a6db52 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -148,9 +148,6 @@ func (s *rpcServer) Start() { jsonRPCRead(w, r, s) }) - s.wg.Add(1) - go s.walletListenerDuplicator() - rpcServeMux.HandleFunc("/wallet", func(w http.ResponseWriter, r *http.Request) { if err := s.checkAuth(r); err != nil { http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) @@ -248,7 +245,6 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) { // initialize memory for websocket connections rpc.ws.connections = make(map[ntfnChan]*requestContexts) - rpc.ws.walletNotificationMaster = make(ntfnChan) rpc.ws.txNotifications = make(map[string]*list.List) rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List) rpc.ws.minedTxNotifications = make(map[btcwire.ShaHash]*list.List) diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 5d714c6c..76d5a664 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -34,6 +34,7 @@ type wsCommandHandler func(*rpcServer, btcjson.Cmd, handlerChans) (interface{}, var wsHandlers = map[string]wsCommandHandler{ "getcurrentnet": handleGetCurrentNet, "getbestblock": handleGetBestBlock, + "notifyblocks": handleNotifyBlocks, "notifynewtxs": handleNotifyNewTXs, "notifyspent": handleNotifySpent, "rescan": handleRescan, @@ -49,10 +50,6 @@ type wsContext struct { // wallet channel as the key. connections map[ntfnChan]*requestContexts - // Any chain notifications meant to be received by every connected - // wallet are sent across this channel. - walletNotificationMaster ntfnChan - // Map of address hash to list of notificationCtx. This is the global // list we actually use for notifications, we also keep a list in the // requestContexts to make removal from this list on connection close @@ -66,6 +63,16 @@ type wsContext struct { minedTxNotifications map[btcwire.ShaHash]*list.List } +// AddBlockUpdateRequest adds the request context to mark a wallet as +// having requested updates for connected and disconnected blocks. +func (r *wsContext) AddBlockUpdateRequest(n ntfnChan) { + r.Lock() + defer r.Unlock() + + rc := r.connections[n] + rc.blockUpdates = true +} + // AddTxRequest adds the request context for new transaction notifications. func (r *wsContext) AddTxRequest(n ntfnChan, addr string) { r.Lock() @@ -209,6 +216,11 @@ func (r *wsContext) CloseListeners(n ntfnChan) { // requestContexts holds all requests for a single wallet connection. type requestContexts struct { + // blockUpdates specifies whether a client has requested notifications + // for whenever blocks are connected or disconnected from the main + // chain. + blockUpdates bool + // txRequests is a set of addresses a wallet has requested transactions // updates for. It is maintained here so all requests can be removed // when a wallet disconnects. @@ -279,6 +291,13 @@ func handleGetBestBlock(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa return result, nil } +// handleNotifyBlocks implements the notifyblocks command extension for +// websocket connections. +func handleNotifyBlocks(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { + s.ws.AddBlockUpdateRequest(c.n) + return nil, nil +} + // handleNotifyNewTXs implements the notifynewtxs command extension for // websocket connections. func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { @@ -512,30 +531,6 @@ func (s *rpcServer) RemoveWalletListener(n ntfnChan) { s.ws.Unlock() } -// walletListenerDuplicator listens for new wallet listener channels -// and duplicates messages sent to walletNotificationMaster to all -// connected listeners. -func (s *rpcServer) walletListenerDuplicator() { - // Duplicate all messages sent across walletNotificationMaster to each - // listening wallet. -out: - for { - select { - case ntfn := <-s.ws.walletNotificationMaster: - s.ws.RLock() - for c := range s.ws.connections { - c <- ntfn - } - s.ws.RUnlock() - - case <-s.quit: - break out - } - } - - s.wg.Done() -} - // walletReqsNotifications is the handler function for websocket // connections from a btcwallet instance. It reads messages from wallet and // sends back replies, as well as notififying wallets of chain updates. @@ -705,7 +700,11 @@ func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) { // TODO: remove int32 type conversion. ntfn := btcws.NewBlockConnectedNtfn(hash.String(), int32(block.Height())) - s.ws.walletNotificationMaster <- ntfn + for ntfnChan, rc := range s.ws.connections { + if rc.blockUpdates { + ntfnChan <- ntfn + } + } // Inform any interested parties about txs mined in this block. s.ws.Lock() @@ -742,7 +741,11 @@ func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) { // TODO: remove int32 type conversion. ntfn := btcws.NewBlockDisconnectedNtfn(hash.String(), int32(block.Height())) - s.ws.walletNotificationMaster <- ntfn + for ntfnChan, rc := range s.ws.connections { + if rc.blockUpdates { + ntfnChan <- ntfn + } + } } // NotifyBlockTXs creates and marshals a JSON message to notify wallets