Ask for block(dis)connected updates.

This removes the last notification that was being sent unsolicited.
Since it is no longer needed, the code to duplicate notifications to
all clients has been removed.
This commit is contained in:
Josh Rickmar 2014-01-17 16:00:46 -05:00
parent 871481ce1b
commit 20e56d6eda
2 changed files with 33 additions and 34 deletions

View file

@ -148,9 +148,6 @@ func (s *rpcServer) Start() {
jsonRPCRead(w, r, s) jsonRPCRead(w, r, s)
}) })
s.wg.Add(1)
go s.walletListenerDuplicator()
rpcServeMux.HandleFunc("/wallet", func(w http.ResponseWriter, r *http.Request) { rpcServeMux.HandleFunc("/wallet", func(w http.ResponseWriter, r *http.Request) {
if err := s.checkAuth(r); err != nil { if err := s.checkAuth(r); err != nil {
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
@ -248,7 +245,6 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) {
// initialize memory for websocket connections // initialize memory for websocket connections
rpc.ws.connections = make(map[ntfnChan]*requestContexts) rpc.ws.connections = make(map[ntfnChan]*requestContexts)
rpc.ws.walletNotificationMaster = make(ntfnChan)
rpc.ws.txNotifications = make(map[string]*list.List) rpc.ws.txNotifications = make(map[string]*list.List)
rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List) rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List)
rpc.ws.minedTxNotifications = make(map[btcwire.ShaHash]*list.List) rpc.ws.minedTxNotifications = make(map[btcwire.ShaHash]*list.List)

View file

@ -34,6 +34,7 @@ type wsCommandHandler func(*rpcServer, btcjson.Cmd, handlerChans) (interface{},
var wsHandlers = map[string]wsCommandHandler{ var wsHandlers = map[string]wsCommandHandler{
"getcurrentnet": handleGetCurrentNet, "getcurrentnet": handleGetCurrentNet,
"getbestblock": handleGetBestBlock, "getbestblock": handleGetBestBlock,
"notifyblocks": handleNotifyBlocks,
"notifynewtxs": handleNotifyNewTXs, "notifynewtxs": handleNotifyNewTXs,
"notifyspent": handleNotifySpent, "notifyspent": handleNotifySpent,
"rescan": handleRescan, "rescan": handleRescan,
@ -49,10 +50,6 @@ type wsContext struct {
// wallet channel as the key. // wallet channel as the key.
connections map[ntfnChan]*requestContexts connections map[ntfnChan]*requestContexts
// Any chain notifications meant to be received by every connected
// wallet are sent across this channel.
walletNotificationMaster ntfnChan
// Map of address hash to list of notificationCtx. This is the global // Map of address hash to list of notificationCtx. This is the global
// list we actually use for notifications, we also keep a list in the // list we actually use for notifications, we also keep a list in the
// requestContexts to make removal from this list on connection close // requestContexts to make removal from this list on connection close
@ -66,6 +63,16 @@ type wsContext struct {
minedTxNotifications map[btcwire.ShaHash]*list.List minedTxNotifications map[btcwire.ShaHash]*list.List
} }
// AddBlockUpdateRequest adds the request context to mark a wallet as
// having requested updates for connected and disconnected blocks.
func (r *wsContext) AddBlockUpdateRequest(n ntfnChan) {
r.Lock()
defer r.Unlock()
rc := r.connections[n]
rc.blockUpdates = true
}
// AddTxRequest adds the request context for new transaction notifications. // AddTxRequest adds the request context for new transaction notifications.
func (r *wsContext) AddTxRequest(n ntfnChan, addr string) { func (r *wsContext) AddTxRequest(n ntfnChan, addr string) {
r.Lock() r.Lock()
@ -209,6 +216,11 @@ func (r *wsContext) CloseListeners(n ntfnChan) {
// requestContexts holds all requests for a single wallet connection. // requestContexts holds all requests for a single wallet connection.
type requestContexts struct { type requestContexts struct {
// blockUpdates specifies whether a client has requested notifications
// for whenever blocks are connected or disconnected from the main
// chain.
blockUpdates bool
// txRequests is a set of addresses a wallet has requested transactions // txRequests is a set of addresses a wallet has requested transactions
// updates for. It is maintained here so all requests can be removed // updates for. It is maintained here so all requests can be removed
// when a wallet disconnects. // when a wallet disconnects.
@ -279,6 +291,13 @@ func handleGetBestBlock(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa
return result, nil return result, nil
} }
// handleNotifyBlocks implements the notifyblocks command extension for
// websocket connections.
func handleNotifyBlocks(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) {
s.ws.AddBlockUpdateRequest(c.n)
return nil, nil
}
// handleNotifyNewTXs implements the notifynewtxs command extension for // handleNotifyNewTXs implements the notifynewtxs command extension for
// websocket connections. // websocket connections.
func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) {
@ -512,30 +531,6 @@ func (s *rpcServer) RemoveWalletListener(n ntfnChan) {
s.ws.Unlock() s.ws.Unlock()
} }
// walletListenerDuplicator listens for new wallet listener channels
// and duplicates messages sent to walletNotificationMaster to all
// connected listeners.
func (s *rpcServer) walletListenerDuplicator() {
// Duplicate all messages sent across walletNotificationMaster to each
// listening wallet.
out:
for {
select {
case ntfn := <-s.ws.walletNotificationMaster:
s.ws.RLock()
for c := range s.ws.connections {
c <- ntfn
}
s.ws.RUnlock()
case <-s.quit:
break out
}
}
s.wg.Done()
}
// walletReqsNotifications is the handler function for websocket // walletReqsNotifications is the handler function for websocket
// connections from a btcwallet instance. It reads messages from wallet and // connections from a btcwallet instance. It reads messages from wallet and
// sends back replies, as well as notififying wallets of chain updates. // sends back replies, as well as notififying wallets of chain updates.
@ -705,7 +700,11 @@ func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
// TODO: remove int32 type conversion. // TODO: remove int32 type conversion.
ntfn := btcws.NewBlockConnectedNtfn(hash.String(), int32(block.Height())) ntfn := btcws.NewBlockConnectedNtfn(hash.String(), int32(block.Height()))
s.ws.walletNotificationMaster <- ntfn for ntfnChan, rc := range s.ws.connections {
if rc.blockUpdates {
ntfnChan <- ntfn
}
}
// Inform any interested parties about txs mined in this block. // Inform any interested parties about txs mined in this block.
s.ws.Lock() s.ws.Lock()
@ -742,7 +741,11 @@ func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) {
// TODO: remove int32 type conversion. // TODO: remove int32 type conversion.
ntfn := btcws.NewBlockDisconnectedNtfn(hash.String(), ntfn := btcws.NewBlockDisconnectedNtfn(hash.String(),
int32(block.Height())) int32(block.Height()))
s.ws.walletNotificationMaster <- ntfn for ntfnChan, rc := range s.ws.connections {
if rc.blockUpdates {
ntfnChan <- ntfn
}
}
} }
// NotifyBlockTXs creates and marshals a JSON message to notify wallets // NotifyBlockTXs creates and marshals a JSON message to notify wallets