diff --git a/rpcclient.go b/rpcclient.go index 2d56510..6c9129c 100644 --- a/rpcclient.go +++ b/rpcclient.go @@ -53,12 +53,6 @@ const ( // prevent a single connection from causing a denial of service attack // with an unnecessarily large number of requests. maxConcurrentClientRequests = 20 - - // maxUnhandledNotifications is the maximum number of still marshaled - // and unhandled notifications. If this limit is reached, the - // btcrpcclient client notification handlers will begin blocking until - // an unhandled notification is processed. - maxUnhandledNotifications = 50 ) type blockSummary struct { @@ -91,32 +85,6 @@ type ( rescanProgress int32 ) -type notificationChan chan notification - -func (c notificationChan) onBlockConnected(hash *btcwire.ShaHash, height int32) { - c <- (blockConnected)(blockSummary{hash, height}) -} - -func (c notificationChan) onBlockDisconnected(hash *btcwire.ShaHash, height int32) { - c <- (blockDisconnected)(blockSummary{hash, height}) -} - -func (c notificationChan) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) { - c <- recvTx{tx, block} -} - -func (c notificationChan) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) { - c <- redeemingTx{tx, block} -} - -func (c notificationChan) onRescanFinished(height int32) { - c <- rescanFinished{error: nil} -} - -func (c notificationChan) onRescanProgress(height int32) { - c <- rescanProgress(height) -} - func (n blockConnected) handleNotification() error { // Update the blockstamp for the newly-connected block. bs := &wallet.BlockStamp{ @@ -304,14 +272,15 @@ func (n rescanProgress) handleNotification() error { type rpcClient struct { *btcrpcclient.Client // client to btcd - chainNotifications notificationChan + enqueueNotification chan notification + dequeueNotification chan notification wg sync.WaitGroup } func newRPCClient(certs []byte) (*rpcClient, error) { - ntfns := make(notificationChan, maxUnhandledNotifications) client := rpcClient{ - chainNotifications: ntfns, + enqueueNotification: make(chan notification), + dequeueNotification: make(chan notification), } initializedClient := make(chan struct{}) ntfnCallbacks := btcrpcclient.NotificationHandlers{ @@ -328,12 +297,12 @@ func newRPCClient(certs []byte) (*rpcClient, error) { client.Stop() } }, - OnBlockConnected: ntfns.onBlockConnected, - OnBlockDisconnected: ntfns.onBlockDisconnected, - OnRecvTx: ntfns.onRecvTx, - OnRedeemingTx: ntfns.onRedeemingTx, - OnRescanFinished: ntfns.onRescanFinished, - OnRescanProgress: ntfns.onRescanProgress, + OnBlockConnected: client.onBlockConnected, + OnBlockDisconnected: client.onBlockDisconnected, + OnRecvTx: client.onRecvTx, + OnRedeemingTx: client.onRedeemingTx, + OnRescanFinished: client.onRescanFinished, + OnRescanProgress: client.onRescanProgress, } conf := btcrpcclient.ConnConfig{ Host: cfg.RPCConnect, @@ -352,7 +321,8 @@ func newRPCClient(certs []byte) (*rpcClient, error) { } func (c *rpcClient) Start() { - c.wg.Add(1) + c.wg.Add(2) + go c.notificationQueue() go c.handleNotifications() } @@ -361,7 +331,7 @@ func (c *rpcClient) Stop() { log.Warn("Disconnecting chain server client connection") c.Client.Shutdown() } - close(c.chainNotifications) + close(c.enqueueNotification) } func (c *rpcClient) WaitForShutdown() { @@ -369,8 +339,83 @@ func (c *rpcClient) WaitForShutdown() { c.wg.Wait() } +func (c *rpcClient) onBlockConnected(hash *btcwire.ShaHash, height int32) { + c.enqueueNotification <- (blockConnected)(blockSummary{hash, height}) +} + +func (c *rpcClient) onBlockDisconnected(hash *btcwire.ShaHash, height int32) { + c.enqueueNotification <- (blockDisconnected)(blockSummary{hash, height}) +} + +func (c *rpcClient) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) { + c.enqueueNotification <- recvTx{tx, block} +} + +func (c *rpcClient) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) { + c.enqueueNotification <- redeemingTx{tx, block} +} + +func (c *rpcClient) onRescanProgress(height int32) { + c.enqueueNotification <- rescanProgress(height) +} + +func (c *rpcClient) onRescanFinished(height int32) { + c.enqueueNotification <- rescanFinished{error: nil} +} + +func (c *rpcClient) notificationQueue() { + // TODO: Rather than leaving this as an unbounded queue for all types of + // notifications, try dropping ones where a later enqueued notification + // can fully invalidate one waiting to be processed. For example, + // blockconnected notifications for greater block heights can remove the + // need to process earlier blockconnected notifications still waiting + // here. + + var q []notification + enqueue := c.enqueueNotification + var dequeue chan notification + var next notification +out: + for { + select { + case n, ok := <-enqueue: + if !ok { + // If no notifications are queued for handling, + // the queue is finished. + if len(q) == 0 { + break out + } + // nil channel so no more reads can occur. + enqueue = nil + continue + } + if len(q) == 0 { + next = n + dequeue = c.dequeueNotification + } + q = append(q, n) + + case dequeue <- next: + q[0] = nil + q = q[1:] + if len(q) != 0 { + next = q[0] + } else { + // If no more notifications can be enqueued, the + // queue is finished. + if enqueue == nil { + break out + } + dequeue = nil + } + } + } + close(c.dequeueNotification) + c.wg.Done() +} + func (c *rpcClient) handleNotifications() { - for n := range c.chainNotifications { + for n := range c.dequeueNotification { err := n.handleNotification() if err != nil { switch e := err.(type) {