Prevent blocking on RPC server messages after shutdown.

This change adds select statements to each channel write that may
occur from a non-RPCS goroutine to unblock once the server has begun
shutting down.  This prevents issues in clean shutdown where non-RPCS
goroutines may block indefinitely on message sends to the RPC server.
This commit is contained in:
Josh Rickmar 2014-03-21 18:07:36 -05:00
parent 715fd22de9
commit 29dd411457

View file

@ -181,13 +181,27 @@ func (m *wsNotificationManager) queueHandler() {
// to the notification manager for block and transaction notification // to the notification manager for block and transaction notification
// processing. // processing.
func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) { func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) {
m.queueNotification <- (*notificationBlockConnected)(block) // As NotifyBlockConnected will be called by the block manager
// and the RPC server may no longer be running, use a select
// statement to unblock enqueueing the notification once the RPC
// server has begun shutting down.
select {
case m.queueNotification <- (*notificationBlockConnected)(block):
case <-m.quit:
}
} }
// NotifyBlockDisconnected passes a block disconnected from the best chain // NotifyBlockDisconnected passes a block disconnected from the best chain
// to the notification manager for block notification processing. // to the notification manager for block notification processing.
func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) {
m.queueNotification <- (*notificationBlockDisconnected)(block) // As NotifyBlockDisconnected will be called by the block manager
// and the RPC server may no longer be running, use a select
// statement to unblock enqueueing the notification once the RPC
// server has begun shutting down.
select {
case m.queueNotification <- (*notificationBlockDisconnected)(block):
case <-m.quit:
}
} }
// NotifyMempoolTx passes a transaction accepted by mempool to the // NotifyMempoolTx passes a transaction accepted by mempool to the
@ -195,10 +209,19 @@ func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) {
// isNew is true, the tx is is a new transaction, rather than one // isNew is true, the tx is is a new transaction, rather than one
// added to the mempool during a reorg. // added to the mempool during a reorg.
func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) { func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) {
m.queueNotification <- &notificationTxAcceptedByMempool{ n := &notificationTxAcceptedByMempool{
isNew: isNew, isNew: isNew,
tx: tx, tx: tx,
} }
// As NotifyMempoolTx will be called by mempool and the RPC server
// may no longer be running, use a select statement to unblock
// enqueueing the notification once the RPC server has begun
// shutting down.
select {
case m.queueNotification <- n:
case <-m.quit:
}
} }
// Notification types // Notification types
@ -354,8 +377,12 @@ out:
} }
// NumClients returns the number of clients actively being served. // NumClients returns the number of clients actively being served.
func (m *wsNotificationManager) NumClients() int { func (m *wsNotificationManager) NumClients() (n int) {
return <-m.numClients select {
case n = <-m.numClients:
case <-m.quit: // Use default n (0) if server has shut down.
}
return
} }
// RegisterBlockUpdates requests block update notifications to the passed // RegisterBlockUpdates requests block update notifications to the passed