Use an unbounded queue for waiting notifications.

Fixes a hang where a send on the notification chan can block due to
the queue being filled, and the current running notification making a
blocking call to the rpc client.

Fixes #100.
This commit is contained in:
Josh Rickmar 2014-06-17 10:17:43 -05:00
parent 83e27ae7db
commit cb717455c7

View file

@ -53,12 +53,6 @@ const (
// prevent a single connection from causing a denial of service attack
// with an unnecessarily large number of requests.
maxConcurrentClientRequests = 20
// maxUnhandledNotifications is the maximum number of still marshaled
// and unhandled notifications. If this limit is reached, the
// btcrpcclient client notification handlers will begin blocking until
// an unhandled notification is processed.
maxUnhandledNotifications = 50
)
type blockSummary struct {
@ -91,32 +85,6 @@ type (
rescanProgress int32
)
type notificationChan chan notification
func (c notificationChan) onBlockConnected(hash *btcwire.ShaHash, height int32) {
c <- (blockConnected)(blockSummary{hash, height})
}
func (c notificationChan) onBlockDisconnected(hash *btcwire.ShaHash, height int32) {
c <- (blockDisconnected)(blockSummary{hash, height})
}
func (c notificationChan) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
c <- recvTx{tx, block}
}
func (c notificationChan) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
c <- redeemingTx{tx, block}
}
func (c notificationChan) onRescanFinished(height int32) {
c <- rescanFinished{error: nil}
}
func (c notificationChan) onRescanProgress(height int32) {
c <- rescanProgress(height)
}
func (n blockConnected) handleNotification() error {
// Update the blockstamp for the newly-connected block.
bs := &wallet.BlockStamp{
@ -304,14 +272,15 @@ func (n rescanProgress) handleNotification() error {
type rpcClient struct {
*btcrpcclient.Client // client to btcd
chainNotifications notificationChan
enqueueNotification chan notification
dequeueNotification chan notification
wg sync.WaitGroup
}
func newRPCClient(certs []byte) (*rpcClient, error) {
ntfns := make(notificationChan, maxUnhandledNotifications)
client := rpcClient{
chainNotifications: ntfns,
enqueueNotification: make(chan notification),
dequeueNotification: make(chan notification),
}
initializedClient := make(chan struct{})
ntfnCallbacks := btcrpcclient.NotificationHandlers{
@ -328,12 +297,12 @@ func newRPCClient(certs []byte) (*rpcClient, error) {
client.Stop()
}
},
OnBlockConnected: ntfns.onBlockConnected,
OnBlockDisconnected: ntfns.onBlockDisconnected,
OnRecvTx: ntfns.onRecvTx,
OnRedeemingTx: ntfns.onRedeemingTx,
OnRescanFinished: ntfns.onRescanFinished,
OnRescanProgress: ntfns.onRescanProgress,
OnBlockConnected: client.onBlockConnected,
OnBlockDisconnected: client.onBlockDisconnected,
OnRecvTx: client.onRecvTx,
OnRedeemingTx: client.onRedeemingTx,
OnRescanFinished: client.onRescanFinished,
OnRescanProgress: client.onRescanProgress,
}
conf := btcrpcclient.ConnConfig{
Host: cfg.RPCConnect,
@ -352,7 +321,8 @@ func newRPCClient(certs []byte) (*rpcClient, error) {
}
func (c *rpcClient) Start() {
c.wg.Add(1)
c.wg.Add(2)
go c.notificationQueue()
go c.handleNotifications()
}
@ -361,7 +331,7 @@ func (c *rpcClient) Stop() {
log.Warn("Disconnecting chain server client connection")
c.Client.Shutdown()
}
close(c.chainNotifications)
close(c.enqueueNotification)
}
func (c *rpcClient) WaitForShutdown() {
@ -369,8 +339,83 @@ func (c *rpcClient) WaitForShutdown() {
c.wg.Wait()
}
func (c *rpcClient) onBlockConnected(hash *btcwire.ShaHash, height int32) {
c.enqueueNotification <- (blockConnected)(blockSummary{hash, height})
}
func (c *rpcClient) onBlockDisconnected(hash *btcwire.ShaHash, height int32) {
c.enqueueNotification <- (blockDisconnected)(blockSummary{hash, height})
}
func (c *rpcClient) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
c.enqueueNotification <- recvTx{tx, block}
}
func (c *rpcClient) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
c.enqueueNotification <- redeemingTx{tx, block}
}
func (c *rpcClient) onRescanProgress(height int32) {
c.enqueueNotification <- rescanProgress(height)
}
func (c *rpcClient) onRescanFinished(height int32) {
c.enqueueNotification <- rescanFinished{error: nil}
}
func (c *rpcClient) notificationQueue() {
// TODO: Rather than leaving this as an unbounded queue for all types of
// notifications, try dropping ones where a later enqueued notification
// can fully invalidate one waiting to be processed. For example,
// blockconnected notifications for greater block heights can remove the
// need to process earlier blockconnected notifications still waiting
// here.
var q []notification
enqueue := c.enqueueNotification
var dequeue chan notification
var next notification
out:
for {
select {
case n, ok := <-enqueue:
if !ok {
// If no notifications are queued for handling,
// the queue is finished.
if len(q) == 0 {
break out
}
// nil channel so no more reads can occur.
enqueue = nil
continue
}
if len(q) == 0 {
next = n
dequeue = c.dequeueNotification
}
q = append(q, n)
case dequeue <- next:
q[0] = nil
q = q[1:]
if len(q) != 0 {
next = q[0]
} else {
// If no more notifications can be enqueued, the
// queue is finished.
if enqueue == nil {
break out
}
dequeue = nil
}
}
}
close(c.dequeueNotification)
c.wg.Done()
}
func (c *rpcClient) handleNotifications() {
for n := range c.chainNotifications {
for n := range c.dequeueNotification {
err := n.handleNotification()
if err != nil {
switch e := err.(type) {