From 1aeead0eeb09656f83fb12bf233140a606eb0604 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 18:07:17 -0700 Subject: [PATCH] chain: keep track of the best block within BitcoindClient In the previous commit, we modified our BitcoindClient struct to use a ConcurrentQueue struct to handle its notifications to the caller. Before this, the BitcoindClient had a goroutine that would handle these notifications in the background and detect when a OnBlockConnected notification was received in order to update the best block. Due to this logic being removed, we now keep track of the best block througout the struct as a whole. --- chain/bitcoind_client.go | 73 +++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 4876883..c322dcc 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -33,7 +33,9 @@ type BitcoindClient struct { zmqPollInterval time.Duration notificationQueue *ConcurrentQueue - currentBlock chan *waddrmgr.BlockStamp + + bestBlockMtx sync.RWMutex + bestBlock waddrmgr.BlockStamp clientMtx sync.RWMutex rescanUpdate chan interface{} @@ -75,7 +77,6 @@ func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass, chainParams: chainParams, zmqConnect: zmqConnect, zmqPollInterval: zmqPollInterval, - currentBlock: make(chan *waddrmgr.BlockStamp), rescanUpdate: make(chan interface{}), watchOutPoints: make(map[wire.OutPoint]struct{}), watchAddrs: make(map[string]struct{}), @@ -374,6 +375,21 @@ func (c *BitcoindClient) Start() error { return errors.New("mismatched networks") } + // Get initial conditions. + bestHash, bestHeight, err := c.GetBestBlock() + if err != nil { + return err + } + bestHeader, err := c.GetBlockHeaderVerbose(bestHash) + if err != nil { + return err + } + c.bestBlock = waddrmgr.BlockStamp{ + Height: bestHeight, + Hash: *bestHash, + Timestamp: time.Unix(bestHeader.Time, 0), + } + // Connect a ZMQ socket for block notifications zmqClient, err := gozmq.Subscribe(c.zmqConnect, []string{"rawblock", "rawtx"}, c.zmqPollInterval) @@ -435,12 +451,11 @@ func (c *BitcoindClient) SetStartTime(startTime time.Time) { // BlockStamp returns the latest block notified by the client, or an error // if the client has been shut down. func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { - select { - case bs := <-c.currentBlock: - return bs, nil - case <-c.quit: - return nil, errors.New("disconnected") - } + c.bestBlockMtx.RLock() + bestBlock := c.bestBlock + c.bestBlockMtx.RUnlock() + + return &bestBlock, nil } func (c *BitcoindClient) onClientConnect() { @@ -539,23 +554,6 @@ func (c *BitcoindClient) socketHandler(zmqClient *gozmq.Conn) { log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect) c.onClientConnect() - // Get initial conditions. - bestHash, bestHeight, err := c.GetBestBlock() - if err != nil { - log.Error(err) - return - } - bestHeader, err := c.GetBlockHeaderVerbose(bestHash) - if err != nil { - log.Error(err) - return - } - bs := &waddrmgr.BlockStamp{ - Height: bestHeight, - Hash: *bestHash, - Timestamp: time.Unix(bestHeader.Time, 0), - } - mainLoop: for { selectLoop: @@ -630,8 +628,7 @@ mainLoop: // We're rescanning from the passed hash. case *chainhash.Hash: - err = c.rescan(e) - if err != nil { + if err := c.rescan(e); err != nil { log.Errorf("rescan failed: %s", err) } @@ -684,20 +681,24 @@ mainLoop: // Check if the block is logically next. If not, we // have a reorg. - if block.Header.PrevBlock == bs.Hash { + c.bestBlockMtx.Lock() + bestBlock := c.bestBlock + if block.Header.PrevBlock == bestBlock.Hash { // No reorg. Notify the subscriber of the block. - bs.Hash = block.BlockHash() - bs.Height++ - bs.Timestamp = block.Header.Timestamp - _, err = c.filterBlock(block, bs.Height, true) + bestBlock.Hash = block.BlockHash() + bestBlock.Height++ + bestBlock.Timestamp = block.Header.Timestamp + _, err = c.filterBlock(block, bestBlock.Height, true) if err != nil { log.Error(err) } + c.bestBlockMtx.Unlock() continue mainLoop } + c.bestBlockMtx.Unlock() // We have a reorg. - err = c.reorg(bs, block) + err = c.reorg(bestBlock, block) if err != nil { log.Errorf("Error during reorg: %v", err) } @@ -711,7 +712,7 @@ mainLoop: // reorg processes a reorganization during chain synchronization. This is // separate from a rescan's handling of a reorg. -func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) error { +func (c *BitcoindClient) reorg(bs waddrmgr.BlockStamp, block *wire.MsgBlock) error { // We rewind until we find a common ancestor between the known chain //and the current chain, and then fast forward again. This relies on // being able to fetch both from bitcoind; to change that would require @@ -786,6 +787,10 @@ func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) er reorgBlocks.Remove(reorgBlocks.Front()) } + c.bestBlockMtx.Lock() + c.bestBlock = bs + c.bestBlockMtx.Unlock() + return nil }