chain: keep track of the best block within BitcoindClient
In the previous commit, we modified our BitcoindClient struct to use a ConcurrentQueue struct to handle its notifications to the caller. Before this, the BitcoindClient had a goroutine that would handle these notifications in the background and detect when a OnBlockConnected notification was received in order to update the best block. Due to this logic being removed, we now keep track of the best block througout the struct as a whole.
This commit is contained in:
parent
2091ac0936
commit
1aeead0eeb
1 changed files with 39 additions and 34 deletions
|
@ -33,7 +33,9 @@ type BitcoindClient struct {
|
||||||
zmqPollInterval time.Duration
|
zmqPollInterval time.Duration
|
||||||
|
|
||||||
notificationQueue *ConcurrentQueue
|
notificationQueue *ConcurrentQueue
|
||||||
currentBlock chan *waddrmgr.BlockStamp
|
|
||||||
|
bestBlockMtx sync.RWMutex
|
||||||
|
bestBlock waddrmgr.BlockStamp
|
||||||
|
|
||||||
clientMtx sync.RWMutex
|
clientMtx sync.RWMutex
|
||||||
rescanUpdate chan interface{}
|
rescanUpdate chan interface{}
|
||||||
|
@ -75,7 +77,6 @@ func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass,
|
||||||
chainParams: chainParams,
|
chainParams: chainParams,
|
||||||
zmqConnect: zmqConnect,
|
zmqConnect: zmqConnect,
|
||||||
zmqPollInterval: zmqPollInterval,
|
zmqPollInterval: zmqPollInterval,
|
||||||
currentBlock: make(chan *waddrmgr.BlockStamp),
|
|
||||||
rescanUpdate: make(chan interface{}),
|
rescanUpdate: make(chan interface{}),
|
||||||
watchOutPoints: make(map[wire.OutPoint]struct{}),
|
watchOutPoints: make(map[wire.OutPoint]struct{}),
|
||||||
watchAddrs: make(map[string]struct{}),
|
watchAddrs: make(map[string]struct{}),
|
||||||
|
@ -374,6 +375,21 @@ func (c *BitcoindClient) Start() error {
|
||||||
return errors.New("mismatched networks")
|
return errors.New("mismatched networks")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get initial conditions.
|
||||||
|
bestHash, bestHeight, err := c.GetBestBlock()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bestHeader, err := c.GetBlockHeaderVerbose(bestHash)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.bestBlock = waddrmgr.BlockStamp{
|
||||||
|
Height: bestHeight,
|
||||||
|
Hash: *bestHash,
|
||||||
|
Timestamp: time.Unix(bestHeader.Time, 0),
|
||||||
|
}
|
||||||
|
|
||||||
// Connect a ZMQ socket for block notifications
|
// Connect a ZMQ socket for block notifications
|
||||||
zmqClient, err := gozmq.Subscribe(c.zmqConnect, []string{"rawblock",
|
zmqClient, err := gozmq.Subscribe(c.zmqConnect, []string{"rawblock",
|
||||||
"rawtx"}, c.zmqPollInterval)
|
"rawtx"}, c.zmqPollInterval)
|
||||||
|
@ -435,12 +451,11 @@ func (c *BitcoindClient) SetStartTime(startTime time.Time) {
|
||||||
// BlockStamp returns the latest block notified by the client, or an error
|
// BlockStamp returns the latest block notified by the client, or an error
|
||||||
// if the client has been shut down.
|
// if the client has been shut down.
|
||||||
func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
|
func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
|
||||||
select {
|
c.bestBlockMtx.RLock()
|
||||||
case bs := <-c.currentBlock:
|
bestBlock := c.bestBlock
|
||||||
return bs, nil
|
c.bestBlockMtx.RUnlock()
|
||||||
case <-c.quit:
|
|
||||||
return nil, errors.New("disconnected")
|
return &bestBlock, nil
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *BitcoindClient) onClientConnect() {
|
func (c *BitcoindClient) onClientConnect() {
|
||||||
|
@ -539,23 +554,6 @@ func (c *BitcoindClient) socketHandler(zmqClient *gozmq.Conn) {
|
||||||
log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect)
|
log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect)
|
||||||
c.onClientConnect()
|
c.onClientConnect()
|
||||||
|
|
||||||
// Get initial conditions.
|
|
||||||
bestHash, bestHeight, err := c.GetBestBlock()
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
bestHeader, err := c.GetBlockHeaderVerbose(bestHash)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
bs := &waddrmgr.BlockStamp{
|
|
||||||
Height: bestHeight,
|
|
||||||
Hash: *bestHash,
|
|
||||||
Timestamp: time.Unix(bestHeader.Time, 0),
|
|
||||||
}
|
|
||||||
|
|
||||||
mainLoop:
|
mainLoop:
|
||||||
for {
|
for {
|
||||||
selectLoop:
|
selectLoop:
|
||||||
|
@ -630,8 +628,7 @@ mainLoop:
|
||||||
|
|
||||||
// We're rescanning from the passed hash.
|
// We're rescanning from the passed hash.
|
||||||
case *chainhash.Hash:
|
case *chainhash.Hash:
|
||||||
err = c.rescan(e)
|
if err := c.rescan(e); err != nil {
|
||||||
if err != nil {
|
|
||||||
log.Errorf("rescan failed: %s",
|
log.Errorf("rescan failed: %s",
|
||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
|
@ -684,20 +681,24 @@ mainLoop:
|
||||||
|
|
||||||
// Check if the block is logically next. If not, we
|
// Check if the block is logically next. If not, we
|
||||||
// have a reorg.
|
// have a reorg.
|
||||||
if block.Header.PrevBlock == bs.Hash {
|
c.bestBlockMtx.Lock()
|
||||||
|
bestBlock := c.bestBlock
|
||||||
|
if block.Header.PrevBlock == bestBlock.Hash {
|
||||||
// No reorg. Notify the subscriber of the block.
|
// No reorg. Notify the subscriber of the block.
|
||||||
bs.Hash = block.BlockHash()
|
bestBlock.Hash = block.BlockHash()
|
||||||
bs.Height++
|
bestBlock.Height++
|
||||||
bs.Timestamp = block.Header.Timestamp
|
bestBlock.Timestamp = block.Header.Timestamp
|
||||||
_, err = c.filterBlock(block, bs.Height, true)
|
_, err = c.filterBlock(block, bestBlock.Height, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
c.bestBlockMtx.Unlock()
|
||||||
continue mainLoop
|
continue mainLoop
|
||||||
}
|
}
|
||||||
|
c.bestBlockMtx.Unlock()
|
||||||
|
|
||||||
// We have a reorg.
|
// We have a reorg.
|
||||||
err = c.reorg(bs, block)
|
err = c.reorg(bestBlock, block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error during reorg: %v", err)
|
log.Errorf("Error during reorg: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -711,7 +712,7 @@ mainLoop:
|
||||||
|
|
||||||
// reorg processes a reorganization during chain synchronization. This is
|
// reorg processes a reorganization during chain synchronization. This is
|
||||||
// separate from a rescan's handling of a reorg.
|
// separate from a rescan's handling of a reorg.
|
||||||
func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) error {
|
func (c *BitcoindClient) reorg(bs waddrmgr.BlockStamp, block *wire.MsgBlock) error {
|
||||||
// We rewind until we find a common ancestor between the known chain
|
// We rewind until we find a common ancestor between the known chain
|
||||||
//and the current chain, and then fast forward again. This relies on
|
//and the current chain, and then fast forward again. This relies on
|
||||||
// being able to fetch both from bitcoind; to change that would require
|
// being able to fetch both from bitcoind; to change that would require
|
||||||
|
@ -786,6 +787,10 @@ func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) er
|
||||||
reorgBlocks.Remove(reorgBlocks.Front())
|
reorgBlocks.Remove(reorgBlocks.Front())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.bestBlockMtx.Lock()
|
||||||
|
c.bestBlock = bs
|
||||||
|
c.bestBlockMtx.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue