Retain order when processing client notifications.

This change modifies the RPC server's notifiation manager from a
struct with requests, protected by a mutux, to two goroutines.  The
first maintains a queue of all notifications and control requests
(registering/unregistering notifications), while the second reads from
the queue and processes notifications and requests one at a time.

Previously, to prevent slowing down block and mempool processing, each
notification would be handled by spawning a new goroutine.  This lead
to cases where notifications would end up being sent to clients in a
different order than they were created.  Adding a queue keeps the
order of notifications originating from the same goroutine, while also
not slowing down processing while waiting for notifications to be
processed and sent.

ok @davecgh
This commit is contained in:
Josh Rickmar 2014-03-04 11:15:25 -05:00
parent aff33f1e3c
commit a7d5b365b1
4 changed files with 413 additions and 284 deletions

View file

@ -995,12 +995,9 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
b.server.txMemPool.RemoveDoubleSpends(tx) b.server.txMemPool.RemoveDoubleSpends(tx)
} }
// Notify frontends // Notify registered websocket clients
if r := b.server.rpcServer; r != nil { if r := b.server.rpcServer; r != nil {
go func() { r.ntfnMgr.NotifyBlockConnected(block)
r.ntfnMgr.NotifyBlockTXs(block)
r.ntfnMgr.NotifyBlockConnected(block)
}()
} }
// A block has been disconnected from the main block chain. // A block has been disconnected from the main block chain.
@ -1023,9 +1020,9 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
} }
} }
// Notify frontends // Notify registered websocket clients
if r := b.server.rpcServer; r != nil { if r := b.server.rpcServer; r != nil {
go r.ntfnMgr.NotifyBlockDisconnected(block) r.ntfnMgr.NotifyBlockDisconnected(block)
} }
} }
} }

View file

@ -911,13 +911,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNe
// Notify websocket clients about mempool transactions. // Notify websocket clients about mempool transactions.
if mp.server.rpcServer != nil { if mp.server.rpcServer != nil {
go func() { mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew)
mp.server.rpcServer.ntfnMgr.NotifyForTx(tx, nil)
if isNew {
mp.server.rpcServer.ntfnMgr.NotifyForNewTx(tx)
}
}()
} }
return nil return nil

View file

@ -210,6 +210,8 @@ func (s *rpcServer) Start() {
s.wg.Done() s.wg.Done()
}(listener) }(listener)
} }
s.ntfnMgr.Start()
} }
// limitConnections responds with a 503 service unavailable and returns true if // limitConnections responds with a 503 service unavailable and returns true if
@ -297,6 +299,7 @@ func (s *rpcServer) Stop() error {
} }
} }
s.ntfnMgr.Shutdown() s.ntfnMgr.Shutdown()
s.ntfnMgr.WaitForShutdown()
close(s.quit) close(s.quit)
s.wg.Wait() s.wg.Wait()
rpcsLog.Infof("RPC server shutdown complete") rpcsLog.Infof("RPC server shutdown complete")

View file

@ -104,75 +104,263 @@ func (s *rpcServer) WebsocketHandler(conn *websocket.Conn, remoteAddr string,
// have registered for and notifies them accordingly. It is also used to keep // have registered for and notifies them accordingly. It is also used to keep
// track of all connected websocket clients. // track of all connected websocket clients.
type wsNotificationManager struct { type wsNotificationManager struct {
sync.Mutex
// server is the RPC server the notification manager is associated with. // server is the RPC server the notification manager is associated with.
server *rpcServer server *rpcServer
// queueNotification queues a notification for handling.
queueNotification chan interface{}
// notificationMsgs feeds notificationHandler with notifications
// and client (un)registeration requests from a queue as well as
// registeration and unregisteration requests from clients.
notificationMsgs chan interface{}
// Access channel for current number of connected clients.
numClients chan int
// Shutdown handling
wg sync.WaitGroup
quit chan struct{}
}
// queueHandler manages a queue of empty interfaces, reading from in and
// sending the oldest unsent to out. This handler stops when either of the
// in or quit channels are closed, and closes out before returning, without
// waiting to send any variables still remaining in the queue.
func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
var q []interface{}
var dequeue chan<- interface{}
skipQueue := out
var next interface{}
out:
for {
select {
case n, ok := <-in:
if !ok {
// Sender closed input channel.
break out
}
// Either send to out immediately if skipQueue is
// non-nil (queue is empty) and reader is ready,
// or append to the queue and send later.
select {
case skipQueue <- n:
default:
q = append(q, n)
dequeue = out
skipQueue = nil
next = q[0]
}
case dequeue <- next:
copy(q, q[1:])
q[len(q)-1] = nil // avoid leak
q = q[:len(q)-1]
if len(q) == 0 {
dequeue = nil
skipQueue = out
}
case <-quit:
break out
}
}
close(out)
}
// queueHandler maintains a queue of notifications and notification handler
// control messages.
func (m *wsNotificationManager) queueHandler() {
queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
m.wg.Done()
}
// NotifyBlockConnected passes a block newly-connected to the best chain
// to the notification manager for block and transaction notification
// processing.
func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) {
m.queueNotification <- (*notificationBlockConnected)(block)
}
// NotifyBlockDisconnected passes a block disconnected from the best chain
// to the notification manager for block notification processing.
func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) {
m.queueNotification <- (*notificationBlockDisconnected)(block)
}
// NotifyMempoolTx passes a transaction accepted by mempool to the
// notification manager for transaction notification processing. If
// isNew is true, the tx is is a new transaction, rather than one
// added to the mempool during a reorg.
func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) {
m.queueNotification <- &notificationTxAcceptedByMempool{
isNew: isNew,
tx: tx,
}
}
// Notification types
type notificationBlockConnected btcutil.Block
type notificationBlockDisconnected btcutil.Block
type notificationTxAcceptedByMempool struct {
isNew bool
tx *btcutil.Tx
}
// Notification control requests
type notificationRegisterClient wsClient
type notificationUnregisterClient wsClient
type notificationRegisterBlocks wsClient
type notificationUnregisterBlocks wsClient
type notificationRegisterNewMempoolTxs wsClient
type notificationUnregisterNewMempoolTxs wsClient
type notificationRegisterSpent struct {
wsc *wsClient
op *btcwire.OutPoint
}
type notificationUnregisterSpent struct {
wsc *wsClient
op *btcwire.OutPoint
}
type notificationRegisterAddr struct {
wsc *wsClient
addr string
}
type notificationUnregisterAddr struct {
wsc *wsClient
addr string
}
// notificationHandler reads notifications and control messages from the queue
// handler and processes one at a time.
func (m *wsNotificationManager) notificationHandler() {
// clients is a map of all currently connected websocket clients. // clients is a map of all currently connected websocket clients.
clients map[chan bool]*wsClient clients := make(map[chan bool]*wsClient)
// Maps used to hold lists of websocket clients to be notified on // Maps used to hold lists of websocket clients to be notified on
// certain events. Each websocket client also keeps maps for the events // certain events. Each websocket client also keeps maps for the events
// which have multiple triggers to make removal from these lists on // which have multiple triggers to make removal from these lists on
// connection close less horrendously expensive. // connection close less horrendously expensive.
blockNotifications map[chan bool]*wsClient //
txNotifications map[chan bool]*wsClient // Where possible, the quit channel is used as the unique id for a client
spentNotifications map[btcwire.OutPoint]map[chan bool]*wsClient // since it is quite a bit more efficient than using the entire struct.
addrNotifications map[string]map[chan bool]*wsClient blockNotifications := make(map[chan bool]*wsClient)
txNotifications := make(map[chan bool]*wsClient)
watchedOutPoints := make(map[btcwire.OutPoint]map[chan bool]*wsClient)
watchedAddrs := make(map[string]map[chan bool]*wsClient)
out:
for {
select {
case n, ok := <-m.notificationMsgs:
if !ok {
// queueHandler quit.
break out
}
switch n := n.(type) {
case *notificationBlockConnected:
block := (*btcutil.Block)(n)
if len(blockNotifications) != 0 {
m.notifyBlockConnected(blockNotifications,
block)
}
// Skip iterating through all txs if no
// tx notification requests exist.
if len(watchedOutPoints) == 0 && len(watchedAddrs) == 0 {
continue
}
for _, tx := range block.Transactions() {
m.notifyForTx(watchedOutPoints,
watchedAddrs, tx, block)
}
case *notificationBlockDisconnected:
m.notifyBlockDisconnected(blockNotifications,
(*btcutil.Block)(n))
case *notificationTxAcceptedByMempool:
if n.isNew && len(txNotifications) != 0 {
m.notifyForNewTx(txNotifications, n.tx)
}
m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil)
case *notificationRegisterBlocks:
wsc := (*wsClient)(n)
blockNotifications[wsc.quit] = wsc
case *notificationRegisterClient:
wsc := (*wsClient)(n)
clients[wsc.quit] = wsc
case *notificationUnregisterClient:
wsc := (*wsClient)(n)
// Remove any requests made by the client as well as
// the client itself.
delete(blockNotifications, wsc.quit)
delete(txNotifications, wsc.quit)
for k := range wsc.spentRequests {
op := k
m.removeSpentRequest(watchedOutPoints, wsc, &op)
}
for addr := range wsc.addrRequests {
m.removeAddrRequest(watchedAddrs, wsc, addr)
}
delete(clients, wsc.quit)
case *notificationRegisterSpent:
m.addSpentRequest(watchedOutPoints, n.wsc, n.op)
case *notificationUnregisterSpent:
m.removeSpentRequest(watchedOutPoints, n.wsc, n.op)
case *notificationRegisterAddr:
m.addAddrRequest(watchedAddrs, n.wsc, n.addr)
case *notificationUnregisterAddr:
m.removeAddrRequest(watchedAddrs, n.wsc, n.addr)
default:
rpcsLog.Warn("Unhandled notification type")
}
case m.numClients <- len(clients):
case <-m.quit:
// RPC server shutting down.
break out
}
}
for _, c := range clients {
c.Disconnect()
}
m.wg.Done()
} }
// NumClients returns the number of clients actively being served. // NumClients returns the number of clients actively being served.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) NumClients() int { func (m *wsNotificationManager) NumClients() int {
m.Lock() return <-m.numClients
defer m.Unlock()
return len(m.clients)
} }
// AddBlockUpdateRequest requests block update notifications to the passed // RegisterBlockUpdates requests block update notifications to the passed
// websocket client. // websocket client.
// func (m *wsNotificationManager) RegisterBlockUpdates(wsc *wsClient) {
// This function is safe for concurrent access. m.queueNotification <- (*notificationRegisterBlocks)(wsc)
func (m *wsNotificationManager) AddBlockUpdateRequest(wsc *wsClient) {
m.Lock()
defer m.Unlock()
// Add the client to the map to notify when block updates are seen.
// Use the quit channel as a unique id for the client since it is quite
// a bit more efficient than using the entire struct.
m.blockNotifications[wsc.quit] = wsc
} }
// RemoveBlockUpdateRequest removes block update notifications for the passed // UnregisterBlockUpdates removes block update notifications for the passed
// websocket client. // websocket client.
// func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) {
// This function is safe for concurrent access. m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
func (m *wsNotificationManager) RemoveBlockUpdateRequest(wsc *wsClient) {
m.Lock()
defer m.Unlock()
// Delete the client from the map to notify when block updates are seen.
// Use the quit channel as a unique id for the client since it is quite
// a bit more efficient than using the entire struct.
delete(m.blockNotifications, wsc.quit)
} }
// NotifyBlockConnected notifies websocket clients that have registered for // notifyBlockConnected notifies websocket clients that have registered for
// block updates when a block is connected to the main chain. // block updates when a block is connected to the main chain.
// func (*wsNotificationManager) notifyBlockConnected(clients map[chan bool]*wsClient,
// This function is safe for concurrent access. block *btcutil.Block) {
func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) {
m.Lock()
defer m.Unlock()
// Nothing to do if there are no websocket clients registered to
// receive notifications that result from a newly connected block.
if len(m.blockNotifications) == 0 {
return
}
hash, err := block.Sha() hash, err := block.Sha()
if err != nil { if err != nil {
@ -188,23 +376,18 @@ func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) {
"%v", err) "%v", err)
return return
} }
for _, wsc := range m.blockNotifications { for _, wsc := range clients {
wsc.QueueNotification(marshalledJSON) wsc.QueueNotification(marshalledJSON)
} }
} }
// NotifyBlockDisconnected notifies websocket clients that have registered for // notifyBlockDisconnected notifies websocket clients that have registered for
// block updates when a block is disconnected from the main chain (due to a // block updates when a block is disconnected from the main chain (due to a
// reorganize). // reorganize).
// func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan bool]*wsClient, block *btcutil.Block) {
// This function is safe for concurrent access. // Skip notification creation if no clients have requested block
func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { // connected/disconnected notifications.
m.Lock() if len(clients) == 0 {
defer m.Unlock()
// Nothing to do if there are no websocket clients registered to
// receive notifications that result from a newly connected block.
if len(m.blockNotifications) == 0 {
return return
} }
@ -224,56 +407,27 @@ func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) {
"notification: %v", err) "notification: %v", err)
return return
} }
for _, wsc := range m.blockNotifications { for _, wsc := range clients {
wsc.QueueNotification(marshalledJSON) wsc.QueueNotification(marshalledJSON)
} }
} }
// AddNewTxRequest requests notifications to the passed websocket client when // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
// new transactions are added to the memory pool. // client when new transactions are added to the memory pool.
// func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {
// This function is safe for concurrent access. m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
func (m *wsNotificationManager) AddNewTxRequest(wsc *wsClient) {
m.Lock()
defer m.Unlock()
// Add the client to the map to notify when a new transaction is added
// to the memory pool. Use the quit channel as a unique id for the
// client since it is quite a bit more efficient than using the entire
// struct.
m.txNotifications[wsc.quit] = wsc
} }
// RemoveNewTxRequest removes notifications to the passed websocket client when // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
// new transaction are added to the memory pool. // client when new transaction are added to the memory pool.
// func (m *wsNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) {
// This function is safe for concurrent access. m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
func (m *wsNotificationManager) RemoveNewTxRequest(wsc *wsClient) {
m.Lock()
defer m.Unlock()
// Delete the client from the map to notify when a new transaction is
// seen in the memory pool. Use the quit channel as a unique id for the
// client since it is quite a bit more efficient than using the entire
// struct.
delete(m.txNotifications, wsc.quit)
} }
// NotifyForNewTx notifies websocket clients that have registerd for updates // notifyForNewTx notifies websocket clients that have registerd for updates
// when a new transaction is added to the memory pool. // when a new transaction is added to the memory pool.
// func (m *wsNotificationManager) notifyForNewTx(clients map[chan bool]*wsClient, tx *btcutil.Tx) {
// This function is safe for concurrent access. txShaStr := tx.Sha().String()
func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) {
m.Lock()
defer m.Unlock()
// Nothing to do if there are no websocket clients registered to
// receive notifications about transactions added to the memory pool.
if len(m.txNotifications) == 0 {
return
}
txID := tx.Sha().String()
mtx := tx.MsgTx() mtx := tx.MsgTx()
var amount int64 var amount int64
@ -281,7 +435,7 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) {
amount += txOut.Value amount += txOut.Value
} }
ntfn := btcws.NewAllTxNtfn(txID, amount) ntfn := btcws.NewAllTxNtfn(txShaStr, amount)
marshalledJSON, err := json.Marshal(ntfn) marshalledJSON, err := json.Marshal(ntfn)
if err != nil { if err != nil {
rpcsLog.Errorf("Failed to marshal tx notification: %s", err.Error()) rpcsLog.Errorf("Failed to marshal tx notification: %s", err.Error())
@ -290,10 +444,12 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) {
var verboseNtfn *btcws.AllVerboseTxNtfn var verboseNtfn *btcws.AllVerboseTxNtfn
var marshalledJSONVerbose []byte var marshalledJSONVerbose []byte
for _, wsc := range m.txNotifications { for _, wsc := range clients {
if wsc.verboseTxUpdates { if wsc.verboseTxUpdates {
if verboseNtfn == nil { if verboseNtfn == nil {
rawTx, err := createTxRawResult(m.server.server.btcnet, txID, mtx, nil, 0, nil) net := m.server.server.btcnet
rawTx, err := createTxRawResult(net, txShaStr,
mtx, nil, 0, nil)
if err != nil { if err != nil {
return return
} }
@ -312,48 +468,59 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) {
} }
} }
// addSpentRequest is the internal function which implements the public // RegisterSpentRequest requests an notification when the passed outpoint is
// AddSpentRequest. See the comment for AddSpentRequest for more details. // confirmed spent (contained in a block connected to the main chain) for the
// // passed websocket client. The request is automatically removed once the
// This function MUST be called with the notification manager lock held. // notification has been sent.
func (m *wsNotificationManager) addSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { func (m *wsNotificationManager) RegisterSpentRequest(wsc *wsClient, op *btcwire.OutPoint) {
m.queueNotification <- &notificationRegisterSpent{
wsc: wsc,
op: op,
}
}
// addSpentRequest modifies a map of watched outpoints to sets of websocket
// clients to add a new request watch the outpoint op and create and send
// a notification when spent to the websocket client wsc.
func (*wsNotificationManager) addSpentRequest(ops map[btcwire.OutPoint]map[chan bool]*wsClient,
wsc *wsClient, op *btcwire.OutPoint) {
// Track the request in the client as well so it can be quickly be // Track the request in the client as well so it can be quickly be
// removed on disconnect. // removed on disconnect.
wsc.spentRequests[*op] = struct{}{} wsc.spentRequests[*op] = struct{}{}
// Add the client to the list to notify when the outpoint is seen. // Add the client to the list to notify when the outpoint is seen.
// Create the list as needed. // Create the list as needed.
cmap, ok := m.spentNotifications[*op] cmap, ok := ops[*op]
if !ok { if !ok {
cmap = make(map[chan bool]*wsClient) cmap = make(map[chan bool]*wsClient)
m.spentNotifications[*op] = cmap ops[*op] = cmap
} }
cmap[wsc.quit] = wsc cmap[wsc.quit] = wsc
} }
// AddSpentRequest requests an notification when the passed outpoint is // UnregisterSpentRequest removes a request from the passed websocket client
// confirmed spent (contained in a block connected to the main chain) for the // to be notified when the passed outpoint is confirmed spent (contained in a
// passed websocket client. The request is automatically removed once the // block connected to the main chain).
// notification has been sent. func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *btcwire.OutPoint) {
// m.queueNotification <- &notificationUnregisterSpent{
// This function is safe for concurrent access. wsc: wsc,
func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { op: op,
m.Lock() }
defer m.Unlock()
m.addSpentRequest(wsc, op)
} }
// removeSpentRequest is the internal function which implements the public // removeSpentRequest modifies a map of watched outpoints to remove the
// RemoveSpentRequest. See the comment for RemoveSpentRequest for more details. // websocket client wsc from the set of clients to be notified when a
// // watched outpoint is spent. If wsc is the last client, the outpoint
// This function MUST be called with the notification manager lock held. // key is removed from the map.
func (m *wsNotificationManager) removeSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { func (*wsNotificationManager) removeSpentRequest(ops map[btcwire.OutPoint]map[chan bool]*wsClient,
wsc *wsClient, op *btcwire.OutPoint) {
// Remove the request tracking from the client. // Remove the request tracking from the client.
delete(wsc.spentRequests, *op) delete(wsc.spentRequests, *op)
// Remove the client from the list to notify. // Remove the client from the list to notify.
notifyMap, ok := m.spentNotifications[*op] notifyMap, ok := ops[*op]
if !ok { if !ok {
rpcsLog.Warnf("Attempt to remove nonexistent spent request "+ rpcsLog.Warnf("Attempt to remove nonexistent spent request "+
"for websocket client %s", wsc.addr) "for websocket client %s", wsc.addr)
@ -361,25 +528,13 @@ func (m *wsNotificationManager) removeSpentRequest(wsc *wsClient, op *btcwire.Ou
} }
delete(notifyMap, wsc.quit) delete(notifyMap, wsc.quit)
// Remove the map entry altogether if there are no more clients // Remove the map entry altogether if there are
// interested in it. // no more clients interested in it.
if len(notifyMap) == 0 { if len(notifyMap) == 0 {
delete(m.spentNotifications, *op) delete(ops, *op)
} }
} }
// RemoveSpentRequest removes a request from the passed websocket client to be
// notified when the passed outpoint is confirmed spent (contained in a block
// connected to the main chain).
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) RemoveSpentRequest(wsc *wsClient, op *btcwire.OutPoint) {
m.Lock()
defer m.Unlock()
m.removeSpentRequest(wsc, op)
}
// txHexString returns the serialized transaction encoded in hexadecimal. // txHexString returns the serialized transaction encoded in hexadecimal.
func txHexString(tx *btcutil.Tx) string { func txHexString(tx *btcutil.Tx) string {
var buf bytes.Buffer var buf bytes.Buffer
@ -388,30 +543,52 @@ func txHexString(tx *btcutil.Tx) string {
return hex.EncodeToString(buf.Bytes()) return hex.EncodeToString(buf.Bytes())
} }
// blockDetails creates a BlockDetails struct to include in btcws notifications
// from a block and a transaction's block index.
func blockDetails(block *btcutil.Block, txIndex int) *btcws.BlockDetails {
if block == nil {
return nil
}
blockSha, _ := block.Sha() // never errors
return &btcws.BlockDetails{
Height: int32(block.Height()),
Hash: blockSha.String(),
Index: txIndex,
Time: block.MsgBlock().Header.Timestamp.Unix(),
}
}
// newRedeemingTxNotification returns a new marshalled redeemingtx notification
// with the passed parameters.
func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) {
// Create and marshal the notification.
ntfn := btcws.NewRedeemingTxNtfn(txHex, blockDetails(block, index))
return json.Marshal(ntfn)
}
// notifyForTxOuts examines each transaction output, notifying interested // notifyForTxOuts examines each transaction output, notifying interested
// websocket clients of the transaction if an output spends to a watched // websocket clients of the transaction if an output spends to a watched
// address. A spent notification request is automatically registered for // address. A spent notification request is automatically registered for
// the client for each matching output. // the client for each matching output.
// func (m *wsNotificationManager) notifyForTxOuts(ops map[btcwire.OutPoint]map[chan bool]*wsClient,
// This function MUST be called with the notification manager lock held. addrs map[string]map[chan bool]*wsClient, tx *btcutil.Tx, block *btcutil.Block) {
func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
// Nothing to do if nobody is listening for address notifications. // Nothing to do if nobody is listening for address notifications.
if len(m.addrNotifications) == 0 { if len(addrs) == 0 {
return return
} }
txHex := "" txHex := ""
wscNotified := make(map[chan bool]bool) wscNotified := make(map[chan bool]bool)
for i, txOut := range tx.MsgTx().TxOut { for i, txOut := range tx.MsgTx().TxOut {
_, addrs, _, err := btcscript.ExtractPkScriptAddrs( _, txAddrs, _, err := btcscript.ExtractPkScriptAddrs(
txOut.PkScript, m.server.server.btcnet) txOut.PkScript, m.server.server.btcnet)
if err != nil { if err != nil {
continue continue
} }
for _, addr := range addrs { for _, txAddr := range txAddrs {
encodedAddr := addr.EncodeAddress() cmap, ok := addrs[txAddr.EncodeAddress()]
cmap, ok := m.addrNotifications[encodedAddr]
if !ok { if !ok {
continue continue
} }
@ -429,7 +606,7 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B
op := btcwire.NewOutPoint(tx.Sha(), uint32(i)) op := btcwire.NewOutPoint(tx.Sha(), uint32(i))
for wscQuit, wsc := range cmap { for wscQuit, wsc := range cmap {
m.addSpentRequest(wsc, op) m.addSpentRequest(ops, wsc, op)
if !wscNotified[wscQuit] { if !wscNotified[wscQuit] {
wscNotified[wscQuit] = true wscNotified[wscQuit] = true
@ -440,36 +617,29 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B
} }
} }
// NotifyForTx examines the inputs and outputs of the passed transaction, // notifyForTx examines the inputs and outputs of the passed transaction,
// notifying websocket clients of outputs spending to a watched address // notifying websocket clients of outputs spending to a watched address
// and inputs spending a watched outpoint. // and inputs spending a watched outpoint.
// func (m *wsNotificationManager) notifyForTx(ops map[btcwire.OutPoint]map[chan bool]*wsClient,
// This function is safe for concurrent access. addrs map[string]map[chan bool]*wsClient, tx *btcutil.Tx, block *btcutil.Block) {
func (m *wsNotificationManager) NotifyForTx(tx *btcutil.Tx, block *btcutil.Block) {
m.Lock()
defer m.Unlock()
m.notifyForTxIns(tx, block) if len(ops) != 0 {
m.notifyForTxOuts(tx, block) m.notifyForTxIns(ops, tx, block)
} }
if len(addrs) != 0 {
// newRedeemingTxNotification returns a new marshalled redeemingtx notification m.notifyForTxOuts(ops, addrs, tx, block)
// with the passed parameters. }
func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) {
// Create and marshal the notification.
ntfn := btcws.NewRedeemingTxNtfn(txHex, blockDetails(block, index))
return json.Marshal(ntfn)
} }
// notifyForTxIns examines the inputs of the passed transaction and sends // notifyForTxIns examines the inputs of the passed transaction and sends
// interested websocket clients a redeemingtx notification if any inputs // interested websocket clients a redeemingtx notification if any inputs
// spend a watched output. If block is non-nil, any matching spent // spend a watched output. If block is non-nil, any matching spent
// requests are removed. // requests are removed.
// func (m *wsNotificationManager) notifyForTxIns(ops map[btcwire.OutPoint]map[chan bool]*wsClient,
// This function MUST be called with the notification manager lock held. tx *btcutil.Tx, block *btcutil.Block) {
func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Block) {
// Nothing to do if nobody is listening for spent notifications. // Nothing to do if nobody is watching outpoints.
if len(m.spentNotifications) == 0 { if len(ops) == 0 {
return return
} }
@ -477,7 +647,7 @@ func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Bl
wscNotified := make(map[chan bool]bool) wscNotified := make(map[chan bool]bool)
for _, txIn := range tx.MsgTx().TxIn { for _, txIn := range tx.MsgTx().TxIn {
prevOut := &txIn.PreviousOutpoint prevOut := &txIn.PreviousOutpoint
if cmap, ok := m.spentNotifications[*prevOut]; ok { if cmap, ok := ops[*prevOut]; ok {
if txHex == "" { if txHex == "" {
txHex = txHexString(tx) txHex = txHexString(tx)
} }
@ -488,7 +658,7 @@ func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Bl
} }
for wscQuit, wsc := range cmap { for wscQuit, wsc := range cmap {
if block != nil { if block != nil {
m.removeSpentRequest(wsc, prevOut) m.removeSpentRequest(ops, wsc, prevOut)
} }
if !wscNotified[wscQuit] { if !wscNotified[wscQuit] {
@ -500,144 +670,109 @@ func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Bl
} }
} }
// NotifyBlockTXs examines the input and outputs of the passed transaction // RegisterTxOutAddressRequest requests notifications to the passed websocket
// and sends websocket clients notifications they are interested in. // client when a transaction output spends to the passed address.
// func (m *wsNotificationManager) RegisterTxOutAddressRequest(wsc *wsClient, addr string) {
// This function is safe for concurrent access. m.queueNotification <- &notificationRegisterAddr{
func (m *wsNotificationManager) NotifyBlockTXs(block *btcutil.Block) { wsc: wsc,
m.Lock() addr: addr,
defer m.Unlock()
// Nothing to do if there are no websocket clients registered to receive
// notifications about spent outpoints or payments to addresses.
if len(m.spentNotifications) == 0 && len(m.addrNotifications) == 0 {
return
}
for _, tx := range block.Transactions() {
m.notifyForTxIns(tx, block)
m.notifyForTxOuts(tx, block)
} }
} }
func blockDetails(block *btcutil.Block, txIndex int) *btcws.BlockDetails { // addAddrRequest adds the websocket client wsc to the address to client set
if block == nil { // addrs so wsc will be notified for any mempool or block transaction outputs
return nil // spending to addr.
} func (*wsNotificationManager) addAddrRequest(addrs map[string]map[chan bool]*wsClient,
blockSha, _ := block.Sha() // never errors wsc *wsClient, addr string) {
return &btcws.BlockDetails{
Height: int32(block.Height()),
Hash: blockSha.String(),
Index: txIndex,
Time: block.MsgBlock().Header.Timestamp.Unix(),
}
}
// AddAddrRequest requests notifications to the passed websocket client when
// a transaction pays to the passed address.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) AddAddrRequest(wsc *wsClient, addr string) {
m.Lock()
defer m.Unlock()
// Track the request in the client as well so it can be quickly be // Track the request in the client as well so it can be quickly be
// removed on disconnect. // removed on disconnect.
wsc.addrRequests[addr] = struct{}{} wsc.addrRequests[addr] = struct{}{}
// Add the client to the list to notify when the outpoint is seen. // Add the client to the set of clients to notify when the outpoint is
// Create the list as needed. // seen. Create map as needed.
cmap, ok := m.addrNotifications[addr] cmap, ok := addrs[addr]
if !ok { if !ok {
cmap = make(map[chan bool]*wsClient) cmap = make(map[chan bool]*wsClient)
m.addrNotifications[addr] = cmap addrs[addr] = cmap
} }
cmap[wsc.quit] = wsc cmap[wsc.quit] = wsc
} }
// removeAddrRequest is the internal function which implements the public // UnregisterTxOutAddressRequest removes a request from the passed websocket
// RemoveAddrRequest. See the comment for RemoveAddrRequest for more details. // client to be notified when a transaction spends to the passed address.
// func (m *wsNotificationManager) UnregisterTxOutAddressRequest(wsc *wsClient, addr string) {
// This function MUST be called with the notification manager lock held. m.queueNotification <- &notificationUnregisterAddr{
func (m *wsNotificationManager) removeAddrRequest(wsc *wsClient, addr string) { wsc: wsc,
addr: addr,
}
}
// removeAddrRequest removes the websocket client wsc from the address to
// client set addrs so it will no longer receive notification updates for
// any transaction outputs send to addr.
func (*wsNotificationManager) removeAddrRequest(addrs map[string]map[chan bool]*wsClient,
wsc *wsClient, addr string) {
// Remove the request tracking from the client. // Remove the request tracking from the client.
delete(wsc.addrRequests, addr) delete(wsc.addrRequests, addr)
// Remove the client from the list to notify. // Remove the client from the list to notify.
notifyMap, ok := m.addrNotifications[addr] cmap, ok := addrs[addr]
if !ok { if !ok {
rpcsLog.Warnf("Attempt to remove nonexistent addr request "+ rpcsLog.Warnf("Attempt to remove nonexistent addr request "+
"<%s> for websocket client %s", addr, wsc.addr) "<%s> for websocket client %s", addr, wsc.addr)
return return
} }
delete(notifyMap, wsc.quit) delete(cmap, wsc.quit)
// Remove the map entry altogether if there are no more clients // Remove the map entry altogether if there are no more clients
// interested in it. // interested in it.
if len(notifyMap) == 0 { if len(cmap) == 0 {
delete(m.addrNotifications, addr) delete(addrs, addr)
} }
} }
// RemoveAddrRequest removes a request from the passed websocket client to be
// notified when a transaction pays to the passed address.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) RemoveAddrRequest(wsc *wsClient, addr string) {
m.Lock()
defer m.Unlock()
m.removeAddrRequest(wsc, addr)
}
// AddClient adds the passed websocket client to the notification manager. // AddClient adds the passed websocket client to the notification manager.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) AddClient(wsc *wsClient) { func (m *wsNotificationManager) AddClient(wsc *wsClient) {
m.Lock() m.queueNotification <- (*notificationRegisterClient)(wsc)
defer m.Unlock()
m.clients[wsc.quit] = wsc
} }
// RemoveClient removes the passed websocket client and all notifications // RemoveClient removes the passed websocket client and all notifications
// registered for it. // registered for it.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) RemoveClient(wsc *wsClient) { func (m *wsNotificationManager) RemoveClient(wsc *wsClient) {
m.Lock() m.queueNotification <- (*notificationUnregisterClient)(wsc)
defer m.Unlock()
// Remove any requests made by the client as well as the client itself.
delete(m.blockNotifications, wsc.quit)
delete(m.txNotifications, wsc.quit)
for k := range wsc.spentRequests {
op := k
m.removeSpentRequest(wsc, &op)
}
for addr := range wsc.addrRequests {
m.removeAddrRequest(wsc, addr)
}
delete(m.clients, wsc.quit)
} }
// Shutdown disconnects all websocket clients the manager knows about. // Start starts the goroutines required for the manager to queue and process
// websocket client notifications.
func (m *wsNotificationManager) Start() {
m.wg.Add(2)
go m.queueHandler()
go m.notificationHandler()
}
// WaitForShutdown blocks until all notification manager goroutines have
// finished.
func (m *wsNotificationManager) WaitForShutdown() {
m.wg.Wait()
}
// Shutdown shuts down the manager, stopping the notification queue and
// notification handler goroutines.
func (m *wsNotificationManager) Shutdown() { func (m *wsNotificationManager) Shutdown() {
for _, wsc := range m.clients { close(m.quit)
wsc.Disconnect()
}
} }
// newWsNotificationManager returns a new notification manager ready for use. // newWsNotificationManager returns a new notification manager ready for use.
// See wsNotificationManager for more details. // See wsNotificationManager for more details.
func newWsNotificationManager(server *rpcServer) *wsNotificationManager { func newWsNotificationManager(server *rpcServer) *wsNotificationManager {
return &wsNotificationManager{ return &wsNotificationManager{
server: server, server: server,
clients: make(map[chan bool]*wsClient), queueNotification: make(chan interface{}),
blockNotifications: make(map[chan bool]*wsClient), notificationMsgs: make(chan interface{}),
txNotifications: make(map[chan bool]*wsClient), numClients: make(chan int),
spentNotifications: make(map[btcwire.OutPoint]map[chan bool]*wsClient), quit: make(chan struct{}),
addrNotifications: make(map[string]map[chan bool]*wsClient),
} }
} }
@ -1228,7 +1363,7 @@ func handleGetCurrentNet(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson
// handleNotifyBlocks implements the notifyblocks command extension for // handleNotifyBlocks implements the notifyblocks command extension for
// websocket connections. // websocket connections.
func handleNotifyBlocks(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { func handleNotifyBlocks(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) {
wsc.server.ntfnMgr.AddBlockUpdateRequest(wsc) wsc.server.ntfnMgr.RegisterBlockUpdates(wsc)
return nil, nil return nil, nil
} }
@ -1240,7 +1375,7 @@ func handleNotifySpent(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.E
return nil, &btcjson.ErrInternal return nil, &btcjson.ErrInternal
} }
wsc.server.ntfnMgr.AddSpentRequest(wsc, cmd.OutPoint) wsc.server.ntfnMgr.RegisterSpentRequest(wsc, cmd.OutPoint)
return nil, nil return nil, nil
} }
@ -1253,7 +1388,7 @@ func handleNotifyAllNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjs
} }
wsc.verboseTxUpdates = cmd.Verbose wsc.verboseTxUpdates = cmd.Verbose
wsc.server.ntfnMgr.AddNewTxRequest(wsc) wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc)
return nil, nil return nil, nil
} }
@ -1275,7 +1410,7 @@ func handleNotifyNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.
return nil, &e return nil, &e
} }
wsc.server.ntfnMgr.AddAddrRequest(wsc, addr.EncodeAddress()) wsc.server.ntfnMgr.RegisterTxOutAddressRequest(wsc, addr.EncodeAddress())
} }
return nil, nil return nil, nil