chain: eliminate start of sync race condition in bitcoind back-end

This commit also allows bitcoind back-end to return block timestamp
in `BlockStamp()` and prevents logging and notification of events
until after the client wallet requests notifications.
This commit is contained in:
Alex 2018-03-13 21:07:40 -06:00 committed by Olaoluwa Osuntokun
parent 6d16463627
commit 81c4b1d096

View file

@ -7,6 +7,7 @@ import (
"errors" "errors"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/lightninglabs/gozmq" "github.com/lightninglabs/gozmq"
@ -41,8 +42,7 @@ type BitcoindClient struct {
watchOutPoints map[wire.OutPoint]struct{} watchOutPoints map[wire.OutPoint]struct{}
watchAddrs map[string]struct{} watchAddrs map[string]struct{}
watchTxIDs map[chainhash.Hash]struct{} watchTxIDs map[chainhash.Hash]struct{}
notifyBlocks bool notify uint32
notifyRecvd bool
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -188,6 +188,7 @@ func (c *BitcoindClient) GetTxOut(txHash *chainhash.Hash, index uint32,
// NotifyReceived updates the watch list with the passed addresses. // NotifyReceived updates the watch list with the passed addresses.
func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error {
c.NotifyBlocks()
select { select {
case c.rescanUpdate <- addrs: case c.rescanUpdate <- addrs:
case <-c.quit: case <-c.quit:
@ -197,6 +198,7 @@ func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error {
// NotifySpent updates the watch list with the passed outPoints. // NotifySpent updates the watch list with the passed outPoints.
func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error {
c.NotifyBlocks()
select { select {
case c.rescanUpdate <- outPoints: case c.rescanUpdate <- outPoints:
case <-c.quit: case <-c.quit:
@ -206,6 +208,7 @@ func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error {
// NotifyTxIDs updates the watch list with the passed TxIDs. // NotifyTxIDs updates the watch list with the passed TxIDs.
func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error { func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error {
c.NotifyBlocks()
select { select {
case c.rescanUpdate <- txids: case c.rescanUpdate <- txids:
case <-c.quit: case <-c.quit:
@ -213,11 +216,17 @@ func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error {
return nil return nil
} }
// NotifyBlocks is always on. // NotifyBlocks enables notifications.
func (c *BitcoindClient) NotifyBlocks() error { func (c *BitcoindClient) NotifyBlocks() error {
atomic.StoreUint32(&c.notify, 1)
return nil return nil
} }
// notifying returns true if notifications have been turned on; false otherwise.
func (c *BitcoindClient) notifying() bool {
return (atomic.LoadUint32(&c.notify) == 1)
}
// LoadTxFilter updates the transaction watchlists for the client. Acceptable // LoadTxFilter updates the transaction watchlists for the client. Acceptable
// arguments after `reset` are any combination of []btcutil.Address, // arguments after `reset` are any combination of []btcutil.Address,
// []wire.OutPoint, []*wire.OutPoint, []chainhash.Hash, and []*chainhash.Hash. // []wire.OutPoint, []*wire.OutPoint, []chainhash.Hash, and []*chainhash.Hash.
@ -283,8 +292,7 @@ func (c *BitcoindClient) RescanBlocks(blockHashes []chainhash.Hash) (
continue continue
} }
relevantTxes, err := c.filterBlock(block, header.Height, relevantTxes, err := c.filterBlock(block, header.Height, false)
false)
if len(relevantTxes) > 0 { if len(relevantTxes) > 0 {
rescannedBlock := btcjson.RescannedBlock{ rescannedBlock := btcjson.RescannedBlock{
Hash: hash.String(), Hash: hash.String(),
@ -440,45 +448,51 @@ func (c *BitcoindClient) onClientConnect() {
} }
func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) { func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) {
select { if c.notifying() {
case c.enqueueNotification <- BlockConnected{ select {
Block: wtxmgr.Block{ case c.enqueueNotification <- BlockConnected{
Hash: *hash, Block: wtxmgr.Block{
Height: height, Hash: *hash,
}, Height: height,
Time: time, },
}: Time: time,
case <-c.quit: }:
case <-c.quit:
}
} }
} }
func (c *BitcoindClient) onFilteredBlockConnected(height int32, func (c *BitcoindClient) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) {
select { if c.notifying() {
case c.enqueueNotification <- FilteredBlockConnected{ select {
Block: &wtxmgr.BlockMeta{ case c.enqueueNotification <- FilteredBlockConnected{
Block: wtxmgr.Block{ Block: &wtxmgr.BlockMeta{
Hash: header.BlockHash(), Block: wtxmgr.Block{
Height: height, Hash: header.BlockHash(),
Height: height,
},
Time: header.Timestamp,
}, },
Time: header.Timestamp, RelevantTxs: relevantTxs,
}, }:
RelevantTxs: relevantTxs, case <-c.quit:
}: }
case <-c.quit:
} }
} }
func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) { func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) {
select { if c.notifying() {
case c.enqueueNotification <- BlockDisconnected{ select {
Block: wtxmgr.Block{ case c.enqueueNotification <- BlockDisconnected{
Hash: *hash, Block: wtxmgr.Block{
Height: height, Hash: *hash,
}, Height: height,
Time: time, },
}: Time: time,
case <-c.quit: }:
case <-c.quit:
}
} }
} }
@ -523,11 +537,21 @@ func (c *BitcoindClient) socketHandler(zmqClient *gozmq.Conn) {
c.onClientConnect() c.onClientConnect()
// Get initial conditions. // Get initial conditions.
bs, err := c.BlockStamp() bestHash, bestHeight, err := c.GetBestBlock()
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
} }
bestHeader, err := c.GetBlockHeaderVerbose(bestHash)
if err != nil {
log.Error(err)
return
}
bs := &waddrmgr.BlockStamp{
Height: bestHeight,
Hash: *bestHash,
Timestamp: time.Unix(bestHeader.Time, 0),
}
mainLoop: mainLoop:
for { for {
@ -651,6 +675,7 @@ mainLoop:
// No reorg. Notify the subscriber of the block. // No reorg. Notify the subscriber of the block.
bs.Hash = block.BlockHash() bs.Hash = block.BlockHash()
bs.Height++ bs.Height++
bs.Timestamp = block.Header.Timestamp
_, err = c.filterBlock(block, bs.Height, true) _, err = c.filterBlock(block, bs.Height, true)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -679,7 +704,7 @@ func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) er
// being able to fetch both from bitcoind; to change that would require // being able to fetch both from bitcoind; to change that would require
// changes in downstream code. // changes in downstream code.
// TODO: Make this more robust in order not to rely on this behavior. // TODO: Make this more robust in order not to rely on this behavior.
log.Infof("Possible reorg at block %s", block.BlockHash()) log.Debugf("Possible reorg at block %s", block.BlockHash())
knownHeader, err := c.GetBlockHeader(&bs.Hash) knownHeader, err := c.GetBlockHeader(&bs.Hash)
if err != nil { if err != nil {
return err return err
@ -694,7 +719,7 @@ func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) er
return err return err
} }
if bestHeight < bs.Height { if bestHeight < bs.Height {
log.Warn("multiple reorgs in a row") log.Debug("multiple reorgs in a row")
return nil return nil
} }
@ -729,6 +754,7 @@ func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) er
if err != nil { if err != nil {
return err return err
} }
bs.Timestamp = knownHeader.Timestamp
} }
// Disconnect the last block from the old chain. Since the PrevBlock is // Disconnect the last block from the old chain. Since the PrevBlock is
@ -916,8 +942,13 @@ func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32,
return nil, nil return nil, nil
} }
log.Debugf("Filtering block %d (%s) with %d transactions", height, // Only mention that we're filtering a block if the client wallet has
block.BlockHash(), len(block.Transactions)) // started monitoring the chain.
if !c.notifying() {
log.Debugf("Filtering block %d (%s) with %d transactions",
height, block.BlockHash(), len(block.Transactions))
}
// Create block details for notifications. // Create block details for notifications.
blockHash := block.BlockHash() blockHash := block.BlockHash()
blockDetails := &btcjson.BlockDetails{ blockDetails := &btcjson.BlockDetails{