diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 2023b4d..6b0584d 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -29,6 +29,10 @@ var ( // BitcoindClient represents a persistent client connection to a bitcoind server // for information regarding the current best block chain. type BitcoindClient struct { + // notifyBlocks signals whether the client is sending block + // notifications to the caller. This must be used atomically. + notifyBlocks uint32 + started int32 // To be used atomically. stopped int32 // To be used atomically. @@ -52,10 +56,6 @@ type BitcoindClient struct { bestBlockMtx sync.RWMutex bestBlock waddrmgr.BlockStamp - // notifyBlocks signals whether the client is sending block - // notifications to the caller. - notifyBlocks uint32 - // rescanUpdate is a channel will be sent items that we should match // transactions against while processing a chain rescan to determine if // they are relevant to the client. @@ -265,7 +265,42 @@ func (c *BitcoindClient) NotifyTx(txids []chainhash.Hash) error { // // NOTE: This is part of the chain.Interface interface. func (c *BitcoindClient) NotifyBlocks() error { - atomic.StoreUint32(&c.notifyBlocks, 1) + // We'll guard the goroutine being spawned below by the notifyBlocks + // variable we'll use atomically. We'll make sure to reset it in case of + // a failure before spawning the goroutine so that it can be retried. + if !atomic.CompareAndSwapUint32(&c.notifyBlocks, 0, 1) { + return nil + } + + // Re-evaluate our known best block since it's possible that blocks have + // occurred between now and when the client was created. This ensures we + // don't detect a new notified block as a potential reorg. + bestHash, bestHeight, err := c.GetBestBlock() + if err != nil { + atomic.StoreUint32(&c.notifyBlocks, 0) + return fmt.Errorf("unable to retrieve best block: %v", err) + } + bestHeader, err := c.GetBlockHeaderVerbose(bestHash) + if err != nil { + atomic.StoreUint32(&c.notifyBlocks, 0) + return fmt.Errorf("unable to retrieve header for best block: "+ + "%v", err) + } + + c.bestBlockMtx.Lock() + c.bestBlock.Hash = *bestHash + c.bestBlock.Height = bestHeight + c.bestBlock.Timestamp = time.Unix(bestHeader.Time, 0) + c.bestBlockMtx.Unlock() + + // Include the client in the set of rescan clients of the backing + // bitcoind connection in order to receive ZMQ event notifications for + // new blocks and transactions. + c.chainConn.AddClient(c) + + c.wg.Add(1) + go c.ntfnHandler() + return nil } @@ -437,14 +472,8 @@ func (c *BitcoindClient) Start() error { } c.bestBlockMtx.Unlock() - // Once the client has started successfully, we'll include it in the set - // of rescan clients of the backing bitcoind connection in order to - // received ZMQ event notifications. - c.chainConn.AddClient(c) - - c.wg.Add(2) + c.wg.Add(1) go c.rescanHandler() - go c.ntfnHandler() return nil } @@ -576,9 +605,9 @@ func (c *BitcoindClient) ntfnHandler() { // successor, so we'll update our best block to reflect // this and determine if this new block matches any of // our existing filters. - c.bestBlockMtx.Lock() + c.bestBlockMtx.RLock() bestBlock := c.bestBlock - c.bestBlockMtx.Unlock() + c.bestBlockMtx.RUnlock() if newBlock.Header.PrevBlock == bestBlock.Hash { newBlockHeight := bestBlock.Height + 1 _ = c.filterBlock(newBlock, newBlockHeight, true) @@ -734,8 +763,6 @@ func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, timestamp time.Time) { - log.Infof("Rescan finished at %d (%s)", height, hash) - select { case c.notificationQueue.ChanIn() <- &RescanFinished{ Hash: hash, @@ -762,8 +789,8 @@ func (c *BitcoindClient) reorg(currentBlock waddrmgr.BlockStamp, bestHash, err) } - log.Debugf("Possible reorg at block: height=%v, hash=%s", bestHash, - bestHeight) + log.Debugf("Possible reorg at block: height=%v, hash=%v", bestHeight, + bestHash) if bestHeight < currentBlock.Height { log.Debugf("Detected multiple reorgs: best_height=%v below "+