From 8e9e4d69262b08a8188bb54d305fe5a7d86b8bfe Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 17:04:24 -0700 Subject: [PATCH 01/11] chain: rename bitcoind.go to bitcoind_client.go --- chain/{bitcoind.go => bitcoind_client.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename chain/{bitcoind.go => bitcoind_client.go} (100%) diff --git a/chain/bitcoind.go b/chain/bitcoind_client.go similarity index 100% rename from chain/bitcoind.go rename to chain/bitcoind_client.go From fc73cc96785159571b360e5b761c3070a9201bd6 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 21 Jun 2018 17:38:23 -0700 Subject: [PATCH 02/11] chain: add concurrent unbounded queue implementation --- chain/queue.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 chain/queue.go diff --git a/chain/queue.go b/chain/queue.go new file mode 100644 index 0000000..b30515b --- /dev/null +++ b/chain/queue.go @@ -0,0 +1,88 @@ +package chain + +import ( + "container/list" +) + +// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. +// Clients interact with the queue by pushing items into the in channel and +// popping items from the out channel. There is a goroutine that manages moving +// items from the in channel to the out channel in the correct order that must +// be started by calling Start(). +type ConcurrentQueue struct { + chanIn chan interface{} + chanOut chan interface{} + quit chan struct{} + overflow *list.List +} + +// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is +// the capacity of the output channel. When the size of the queue is below this +// threshold, pushes do not incur the overhead of the less efficient overflow +// structure. +func NewConcurrentQueue(bufferSize int) *ConcurrentQueue { + return &ConcurrentQueue{ + chanIn: make(chan interface{}), + chanOut: make(chan interface{}, bufferSize), + quit: make(chan struct{}), + overflow: list.New(), + } +} + +// ChanIn returns a channel that can be used to push new items into the queue. +func (cq *ConcurrentQueue) ChanIn() chan<- interface{} { + return cq.chanIn +} + +// ChanOut returns a channel that can be used to pop items from the queue. +func (cq *ConcurrentQueue) ChanOut() <-chan interface{} { + return cq.chanOut +} + +// Start begins a goroutine that manages moving items from the in channel to +// the out channel. The queue tries to move items directly to the out channel +// minimize overhead, but if the out channel is full it pushes items to an +// overflow queue. This must be called before using the queue. +func (cq *ConcurrentQueue) Start() { + go func() { + for { + nextElement := cq.overflow.Front() + if nextElement == nil { + // The overflow queue is empty, so incoming + // items can be pushed directly to the output + // channel. However, if output channel is full, + // we'll push to the overflow list instead. + select { + case item := <-cq.chanIn: + select { + case cq.chanOut <- item: + case <-cq.quit: + return + default: + cq.overflow.PushBack(item) + } + case <-cq.quit: + return + } + } else { + // The overflow queue is not empty, so any new + // items get pushed to the back to preserve + // order. + select { + case item := <-cq.chanIn: + cq.overflow.PushBack(item) + case cq.chanOut <- nextElement.Value: + cq.overflow.Remove(nextElement) + case <-cq.quit: + return + } + } + } + }() +} + +// Stop ends the goroutine that moves items from the in channel to the out +// channel. +func (cq *ConcurrentQueue) Stop() { + close(cq.quit) +} From 2091ac0936f41dc1c68616fdc6631af5b16457b6 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 17:24:23 -0700 Subject: [PATCH 03/11] chain: use ConcurrentQueue within BitcoindClient to handle event notifications --- chain/bitcoind_client.go | 131 +++++++-------------------------------- 1 file changed, 22 insertions(+), 109 deletions(-) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 0e10ec1..4876883 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -32,9 +32,8 @@ type BitcoindClient struct { zmqConnect string zmqPollInterval time.Duration - enqueueNotification chan interface{} - dequeueNotification chan interface{} - currentBlock chan *waddrmgr.BlockStamp + notificationQueue *ConcurrentQueue + currentBlock chan *waddrmgr.BlockStamp clientMtx sync.RWMutex rescanUpdate chan interface{} @@ -73,19 +72,17 @@ func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass, DisableTLS: true, HTTPPostMode: true, }, - chainParams: chainParams, - zmqConnect: zmqConnect, - zmqPollInterval: zmqPollInterval, - enqueueNotification: make(chan interface{}), - dequeueNotification: make(chan interface{}), - currentBlock: make(chan *waddrmgr.BlockStamp), - rescanUpdate: make(chan interface{}), - watchOutPoints: make(map[wire.OutPoint]struct{}), - watchAddrs: make(map[string]struct{}), - watchTxIDs: make(map[chainhash.Hash]struct{}), - quit: make(chan struct{}), - memPool: make(map[chainhash.Hash]struct{}), - memPoolExp: make(map[int32]map[chainhash.Hash]struct{}), + chainParams: chainParams, + zmqConnect: zmqConnect, + zmqPollInterval: zmqPollInterval, + currentBlock: make(chan *waddrmgr.BlockStamp), + rescanUpdate: make(chan interface{}), + watchOutPoints: make(map[wire.OutPoint]struct{}), + watchAddrs: make(map[string]struct{}), + watchTxIDs: make(map[chainhash.Hash]struct{}), + quit: make(chan struct{}), + memPool: make(map[chainhash.Hash]struct{}), + memPoolExp: make(map[int32]map[chainhash.Hash]struct{}), } rpcClient, err := rpcclient.New(client.connConfig, nil) if err != nil { @@ -389,7 +386,6 @@ func (c *BitcoindClient) Start() error { c.quitMtx.Unlock() c.wg.Add(2) - go c.handler() go c.socketHandler(zmqClient) return nil } @@ -403,10 +399,7 @@ func (c *BitcoindClient) Stop() { default: close(c.quit) c.client.Shutdown() - - if !c.started { - close(c.dequeueNotification) - } + c.notificationQueue.Stop() } c.quitMtx.Unlock() } @@ -423,7 +416,7 @@ func (c *BitcoindClient) WaitForShutdown() { // may abort for running out memory, as unread notifications are queued for // later reads. func (c *BitcoindClient) Notifications() <-chan interface{} { - return c.dequeueNotification + return c.notificationQueue.ChanOut() } // SetStartTime is a non-interface method to set the birthday of the wallet @@ -452,7 +445,7 @@ func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { func (c *BitcoindClient) onClientConnect() { select { - case c.enqueueNotification <- ClientConnected{}: + case c.notificationQueue.ChanIn() <- ClientConnected{}: case <-c.quit: } } @@ -460,7 +453,7 @@ func (c *BitcoindClient) onClientConnect() { func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) { if c.notifying() { select { - case c.enqueueNotification <- BlockConnected{ + case c.notificationQueue.ChanIn() <- BlockConnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, @@ -476,7 +469,7 @@ func (c *BitcoindClient) onFilteredBlockConnected(height int32, header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { if c.notifying() { select { - case c.enqueueNotification <- FilteredBlockConnected{ + case c.notificationQueue.ChanIn() <- FilteredBlockConnected{ Block: &wtxmgr.BlockMeta{ Block: wtxmgr.Block{ Hash: header.BlockHash(), @@ -494,7 +487,7 @@ func (c *BitcoindClient) onFilteredBlockConnected(height int32, func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) { if c.notifying() { select { - case c.enqueueNotification <- BlockDisconnected{ + case c.notificationQueue.ChanIn() <- BlockDisconnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, @@ -516,14 +509,14 @@ func (c *BitcoindClient) onRelevantTx(rec *wtxmgr.TxRecord, } select { - case c.enqueueNotification <- RelevantTx{rec, blk}: + case c.notificationQueue.ChanIn() <- RelevantTx{rec, blk}: case <-c.quit: } } func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) { select { - case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}: + case c.notificationQueue.ChanIn() <- &RescanProgress{hash, height, blkTime}: case <-c.quit: } } @@ -531,7 +524,7 @@ func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, bl func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) { log.Infof("Rescan finished at %d (%s)", height, hash) select { - case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}: + case c.notificationQueue.ChanIn() <- &RescanFinished{hash, height, blkTime}: case <-c.quit: } @@ -1177,83 +1170,3 @@ func (c *BitcoindClient) filterTx(tx *wire.MsgTx, return notifyTx, rec, nil } - -// handler maintains a queue of notifications and the current state (best -// block) of the chain. -func (c *BitcoindClient) handler() { - hash, height, err := c.GetBestBlock() - if err != nil { - log.Errorf("Failed to receive best block from chain server: %v", err) - c.Stop() - c.wg.Done() - return - } - - bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height} - - // TODO: Rather than leaving this as an unbounded queue for all types of - // notifications, try dropping ones where a later enqueued notification - // can fully invalidate one waiting to be processed. For example, - // blockconnected notifications for greater block heights can remove the - // need to process earlier blockconnected notifications still waiting - // here. - - // TODO(aakselrod): Factor this logic out so it can be reused for each - // chain back end, rather than copying it. - - var notifications []interface{} - enqueue := c.enqueueNotification - var dequeue chan interface{} - var next interface{} -out: - for { - select { - case n, ok := <-enqueue: - if !ok { - // If no notifications are queued for handling, - // the queue is finished. - if len(notifications) == 0 { - break out - } - // nil channel so no more reads can occur. - enqueue = nil - continue - } - if len(notifications) == 0 { - next = n - dequeue = c.dequeueNotification - } - notifications = append(notifications, n) - - case dequeue <- next: - if n, ok := next.(BlockConnected); ok { - bs = &waddrmgr.BlockStamp{ - Height: n.Height, - Hash: n.Hash, - } - } - - notifications[0] = nil - notifications = notifications[1:] - if len(notifications) != 0 { - next = notifications[0] - } else { - // If no more notifications can be enqueued, the - // queue is finished. - if enqueue == nil { - break out - } - dequeue = nil - } - - case c.currentBlock <- bs: - - case <-c.quit: - break out - } - } - - c.Stop() - close(c.dequeueNotification) - c.wg.Done() -} From 1aeead0eeb09656f83fb12bf233140a606eb0604 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 18:07:17 -0700 Subject: [PATCH 04/11] chain: keep track of the best block within BitcoindClient In the previous commit, we modified our BitcoindClient struct to use a ConcurrentQueue struct to handle its notifications to the caller. Before this, the BitcoindClient had a goroutine that would handle these notifications in the background and detect when a OnBlockConnected notification was received in order to update the best block. Due to this logic being removed, we now keep track of the best block througout the struct as a whole. --- chain/bitcoind_client.go | 73 +++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 4876883..c322dcc 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -33,7 +33,9 @@ type BitcoindClient struct { zmqPollInterval time.Duration notificationQueue *ConcurrentQueue - currentBlock chan *waddrmgr.BlockStamp + + bestBlockMtx sync.RWMutex + bestBlock waddrmgr.BlockStamp clientMtx sync.RWMutex rescanUpdate chan interface{} @@ -75,7 +77,6 @@ func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass, chainParams: chainParams, zmqConnect: zmqConnect, zmqPollInterval: zmqPollInterval, - currentBlock: make(chan *waddrmgr.BlockStamp), rescanUpdate: make(chan interface{}), watchOutPoints: make(map[wire.OutPoint]struct{}), watchAddrs: make(map[string]struct{}), @@ -374,6 +375,21 @@ func (c *BitcoindClient) Start() error { return errors.New("mismatched networks") } + // Get initial conditions. + bestHash, bestHeight, err := c.GetBestBlock() + if err != nil { + return err + } + bestHeader, err := c.GetBlockHeaderVerbose(bestHash) + if err != nil { + return err + } + c.bestBlock = waddrmgr.BlockStamp{ + Height: bestHeight, + Hash: *bestHash, + Timestamp: time.Unix(bestHeader.Time, 0), + } + // Connect a ZMQ socket for block notifications zmqClient, err := gozmq.Subscribe(c.zmqConnect, []string{"rawblock", "rawtx"}, c.zmqPollInterval) @@ -435,12 +451,11 @@ func (c *BitcoindClient) SetStartTime(startTime time.Time) { // BlockStamp returns the latest block notified by the client, or an error // if the client has been shut down. func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { - select { - case bs := <-c.currentBlock: - return bs, nil - case <-c.quit: - return nil, errors.New("disconnected") - } + c.bestBlockMtx.RLock() + bestBlock := c.bestBlock + c.bestBlockMtx.RUnlock() + + return &bestBlock, nil } func (c *BitcoindClient) onClientConnect() { @@ -539,23 +554,6 @@ func (c *BitcoindClient) socketHandler(zmqClient *gozmq.Conn) { log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect) c.onClientConnect() - // Get initial conditions. - bestHash, bestHeight, err := c.GetBestBlock() - if err != nil { - log.Error(err) - return - } - bestHeader, err := c.GetBlockHeaderVerbose(bestHash) - if err != nil { - log.Error(err) - return - } - bs := &waddrmgr.BlockStamp{ - Height: bestHeight, - Hash: *bestHash, - Timestamp: time.Unix(bestHeader.Time, 0), - } - mainLoop: for { selectLoop: @@ -630,8 +628,7 @@ mainLoop: // We're rescanning from the passed hash. case *chainhash.Hash: - err = c.rescan(e) - if err != nil { + if err := c.rescan(e); err != nil { log.Errorf("rescan failed: %s", err) } @@ -684,20 +681,24 @@ mainLoop: // Check if the block is logically next. If not, we // have a reorg. - if block.Header.PrevBlock == bs.Hash { + c.bestBlockMtx.Lock() + bestBlock := c.bestBlock + if block.Header.PrevBlock == bestBlock.Hash { // No reorg. Notify the subscriber of the block. - bs.Hash = block.BlockHash() - bs.Height++ - bs.Timestamp = block.Header.Timestamp - _, err = c.filterBlock(block, bs.Height, true) + bestBlock.Hash = block.BlockHash() + bestBlock.Height++ + bestBlock.Timestamp = block.Header.Timestamp + _, err = c.filterBlock(block, bestBlock.Height, true) if err != nil { log.Error(err) } + c.bestBlockMtx.Unlock() continue mainLoop } + c.bestBlockMtx.Unlock() // We have a reorg. - err = c.reorg(bs, block) + err = c.reorg(bestBlock, block) if err != nil { log.Errorf("Error during reorg: %v", err) } @@ -711,7 +712,7 @@ mainLoop: // reorg processes a reorganization during chain synchronization. This is // separate from a rescan's handling of a reorg. -func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) error { +func (c *BitcoindClient) reorg(bs waddrmgr.BlockStamp, block *wire.MsgBlock) error { // We rewind until we find a common ancestor between the known chain //and the current chain, and then fast forward again. This relies on // being able to fetch both from bitcoind; to change that would require @@ -786,6 +787,10 @@ func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) er reorgBlocks.Remove(reorgBlocks.Front()) } + c.bestBlockMtx.Lock() + c.bestBlock = bs + c.bestBlockMtx.Unlock() + return nil } From 7df2b72beb09f2aee0fff7a5af2efa849719b596 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 17:49:35 -0700 Subject: [PATCH 05/11] chain: switch BitcoindClient to use atomic started/stopped pattern --- chain/bitcoind_client.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index c322dcc..c6edc88 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -25,6 +25,9 @@ import ( // BitcoindClient represents a persistent client connection to a bitcoind server // for information regarding the current best block chain. type BitcoindClient struct { + started int32 // To be used atomically. + stopped int32 // To be used atomically. + client *rpcclient.Client connConfig *rpcclient.ConnConfig // Work around unexported field chainParams *chaincfg.Params @@ -45,13 +48,11 @@ type BitcoindClient struct { watchTxIDs map[chainhash.Hash]struct{} notify uint32 - quit chan struct{} - wg sync.WaitGroup - started bool - quitMtx sync.Mutex - memPool map[chainhash.Hash]struct{} memPoolExp map[int32]map[chainhash.Hash]struct{} + + quit chan struct{} + wg sync.WaitGroup } // NewBitcoindClient creates a client connection to the server described by the @@ -364,6 +365,10 @@ func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx, // function gives up, and therefore will not block forever waiting for the // connection to be established to a server that may not exist. func (c *BitcoindClient) Start() error { + if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { + return nil + } + // Verify that the server is running on the expected network. net, err := c.GetCurrentNet() if err != nil { @@ -397,10 +402,6 @@ func (c *BitcoindClient) Start() error { return err } - c.quitMtx.Lock() - c.started = true - c.quitMtx.Unlock() - c.wg.Add(2) go c.socketHandler(zmqClient) return nil @@ -409,15 +410,13 @@ func (c *BitcoindClient) Start() error { // Stop disconnects the client and signals the shutdown of all goroutines // started by Start. func (c *BitcoindClient) Stop() { - c.quitMtx.Lock() - select { - case <-c.quit: - default: - close(c.quit) - c.client.Shutdown() - c.notificationQueue.Stop() + if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { + return } - c.quitMtx.Unlock() + + close(c.quit) + c.client.Shutdown() + c.notificationQueue.Stop() } // WaitForShutdown blocks until both the client has finished disconnecting From 0b269d799e79b1a2249b30472cb1f7576697464a Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 18:08:19 -0700 Subject: [PATCH 06/11] chain: send empty struct instance rather than bool to reset filters --- chain/bitcoind_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index c6edc88..d06e924 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -236,7 +236,7 @@ func (c *BitcoindClient) LoadTxFilter(reset bool, // If we reset, signal that. if reset { select { - case c.rescanUpdate <- reset: + case c.rescanUpdate <- struct{}{}: case <-c.quit: return nil } From 27e22b1f79983d7afb2f18d8531786e778258676 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 18:14:21 -0700 Subject: [PATCH 07/11] chain: return err when updating the rescan filter while shutting down --- chain/bitcoind_client.go | 47 ++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index d06e924..692adb1 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -5,6 +5,7 @@ import ( "container/list" "encoding/hex" "errors" + "fmt" "net" "sync" "sync/atomic" @@ -22,6 +23,13 @@ import ( "github.com/lightninglabs/gozmq" ) +var ( + // ErrBitcoindClientShuttingDown is an error returned when we attempt + // to receive a notification for a specific item and the bitcoind client + // is in the middle of shutting down. + ErrBitcoindClientShuttingDown = errors.New("client is shutting down") +) + // BitcoindClient represents a persistent client connection to a bitcoind server // for information regarding the current best block chain. type BitcoindClient struct { @@ -191,7 +199,9 @@ func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { select { case c.rescanUpdate <- addrs: case <-c.quit: + return ErrBitcoindClientShuttingDown } + return nil } @@ -201,7 +211,9 @@ func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { select { case c.rescanUpdate <- outPoints: case <-c.quit: + return ErrBitcoindClientShuttingDown } + return nil } @@ -211,7 +223,9 @@ func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error { select { case c.rescanUpdate <- txids: case <-c.quit: + return ErrBitcoindClientShuttingDown } + return nil } @@ -238,45 +252,54 @@ func (c *BitcoindClient) LoadTxFilter(reset bool, select { case c.rescanUpdate <- struct{}{}: case <-c.quit: - return nil + return ErrBitcoindClientShuttingDown } } // This helper function will send an update to the filter. If the quit // channel is closed, it will allow the outer loop below to finish, // but skip over any updates as the quit case is triggered each time. - sendList := func(list interface{}) { + sendList := func(list interface{}) error { select { case c.rescanUpdate <- list: case <-c.quit: + return ErrBitcoindClientShuttingDown } + + return nil } + var err error for _, watchList := range watchLists { switch list := watchList.(type) { case map[wire.OutPoint]btcutil.Address: - sendList(list) + err = sendList(list) case []wire.OutPoint: - sendList(list) + err = sendList(list) case []*wire.OutPoint: - sendList(list) + err = sendList(list) case []btcutil.Address: - sendList(list) + err = sendList(list) case []chainhash.Hash: - sendList(list) + err = sendList(list) case []*chainhash.Hash: - sendList(list) + err = sendList(list) default: - log.Warnf("Couldn't add item to filter: unknown type") + return fmt.Errorf("unable to update filter: unknown "+ + "type %T", list) + } + if err != nil { + return err } } + return nil } @@ -332,21 +355,21 @@ func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, select { case c.rescanUpdate <- addrs: case <-c.quit: - return nil + return ErrBitcoindClientShuttingDown } // Update outpoints. select { case c.rescanUpdate <- outPoints: case <-c.quit: - return nil + return ErrBitcoindClientShuttingDown } // Kick off the rescan with the starting block hash. select { case c.rescanUpdate <- blockHash: case <-c.quit: - return nil + return ErrBitcoindClientShuttingDown } return nil From 00428d58280ad679dc2c9d7fb86c9d86329c634a Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 18:43:18 -0700 Subject: [PATCH 08/11] chain: share the same bitcoind connection between multiple rescan clients --- chain/bitcoind_client.go | 1231 ++++++++++++++++++++------------------ chain/bitcoind_conn.go | 362 +++++++++++ 2 files changed, 1001 insertions(+), 592 deletions(-) create mode 100644 chain/bitcoind_conn.go diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 692adb1..e0cb90a 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -1,12 +1,10 @@ package chain import ( - "bytes" "container/list" "encoding/hex" "errors" "fmt" - "net" "sync" "sync/atomic" "time" @@ -14,13 +12,11 @@ import ( "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/wtxmgr" - "github.com/lightninglabs/gozmq" ) var ( @@ -36,99 +32,90 @@ type BitcoindClient struct { started int32 // To be used atomically. stopped int32 // To be used atomically. - client *rpcclient.Client - connConfig *rpcclient.ConnConfig // Work around unexported field + // birthday is the earliest time for which we should begin scanning the + // chain. + birthday time.Time + + // chainParams are the parameters of the current chain this client is + // active under. chainParams *chaincfg.Params - zmqConnect string - zmqPollInterval time.Duration + // id is the unique ID of this client assigned by the backing bitcoind + // connection. + id uint64 - notificationQueue *ConcurrentQueue + // chainConn is the backing client to our rescan client that contains + // the RPC and ZMQ connections to a bitcoind node. + chainConn *BitcoindConn + // bestBlock keeps track of the tip of the current best chain. bestBlockMtx sync.RWMutex bestBlock waddrmgr.BlockStamp - clientMtx sync.RWMutex - rescanUpdate chan interface{} - startTime time.Time - watchOutPoints map[wire.OutPoint]struct{} - watchAddrs map[string]struct{} - watchTxIDs map[chainhash.Hash]struct{} - notify uint32 + // notifyBlocks signals whether the client is sending block + // notifications to the caller. + notifyBlocks uint32 - memPool map[chainhash.Hash]struct{} - memPoolExp map[int32]map[chainhash.Hash]struct{} + // rescanUpdate is a channel will be sent items that we should match + // transactions against while processing a chain rescan to determine if + // they are relevant to the client. + rescanUpdate chan interface{} + + // watchedAddresses, watchedOutPoints, and watchedTxs are the set of + // items we should match transactions against while processing a chain + // rescan to determine if they are relevant to the client. + watchMtx sync.RWMutex + watchedAddresses map[string]struct{} + watchedOutPoints map[wire.OutPoint]struct{} + watchedTxs map[chainhash.Hash]struct{} + + // mempool keeps track of all relevant transactions that have yet to be + // confirmed. This is used to shortcut the filtering process of a + // transaction when a new confirmed transaction notification is + // received. + // + // NOTE: This requires the watchMtx to be held. + mempool map[chainhash.Hash]struct{} + + // expiredMempool keeps track of a set of confirmed transactions along + // with the height at which they were included in a block. These + // transactions will then be removed from the mempool after a period of + // 288 blocks. This is done to ensure the transactions are safe from a + // reorg in the chain. + // + // NOTE: This requires the watchMtx to be held. + expiredMempool map[int32]map[chainhash.Hash]struct{} + + // notificationQueue is a concurrent unbounded queue that handles + // dispatching notifications to the subscriber of this client. + // + // TODO: Rather than leaving this as an unbounded queue for all types of + // notifications, try dropping ones where a later enqueued notification + // can fully invalidate one waiting to be processed. For example, + // BlockConnected notifications for greater block heights can remove the + // need to process earlier notifications still waiting to be processed. + notificationQueue *ConcurrentQueue + + // zmqTxNtfns is a channel through which ZMQ transaction events will be + // retrieved from the backing bitcoind connection. + zmqTxNtfns chan *wire.MsgTx + + // zmqBlockNtfns is a channel through which ZMQ block events will be + // retrieved from the backing bitcoind connection. + zmqBlockNtfns chan *wire.MsgBlock quit chan struct{} wg sync.WaitGroup } -// NewBitcoindClient creates a client connection to the server described by the -// connect string. If disableTLS is false, the remote RPC certificate must be -// provided in the certs slice. The connection is not established immediately, -// but must be done using the Start method. If the remote server does not -// operate on the same bitcoin network as described by the passed chain -// parameters, the connection will be disconnected. -func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass, - zmqConnect string, zmqPollInterval time.Duration) (*BitcoindClient, - error) { - - client := &BitcoindClient{ - connConfig: &rpcclient.ConnConfig{ - Host: connect, - User: user, - Pass: pass, - DisableAutoReconnect: false, - DisableConnectOnNew: true, - DisableTLS: true, - HTTPPostMode: true, - }, - chainParams: chainParams, - zmqConnect: zmqConnect, - zmqPollInterval: zmqPollInterval, - rescanUpdate: make(chan interface{}), - watchOutPoints: make(map[wire.OutPoint]struct{}), - watchAddrs: make(map[string]struct{}), - watchTxIDs: make(map[chainhash.Hash]struct{}), - quit: make(chan struct{}), - memPool: make(map[chainhash.Hash]struct{}), - memPoolExp: make(map[int32]map[chainhash.Hash]struct{}), - } - rpcClient, err := rpcclient.New(client.connConfig, nil) - if err != nil { - return nil, err - } - client.client = rpcClient - return client, nil -} - // BackEnd returns the name of the driver. func (c *BitcoindClient) BackEnd() string { return "bitcoind" } -// GetCurrentNet returns the network on which the bitcoind instance is running. -func (c *BitcoindClient) GetCurrentNet() (wire.BitcoinNet, error) { - hash, err := c.client.GetBlockHash(0) - if err != nil { - return 0, err - } - - switch *hash { - case *chaincfg.TestNet3Params.GenesisHash: - return chaincfg.TestNet3Params.Net, nil - case *chaincfg.RegressionNetParams.GenesisHash: - return chaincfg.RegressionNetParams.Net, nil - case *chaincfg.MainNetParams.GenesisHash: - return chaincfg.MainNetParams.Net, nil - default: - return 0, errors.New("unknown network") - } -} - // GetBestBlock returns the highest block known to bitcoind. func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) { - bcinfo, err := c.client.GetBlockChainInfo() + bcinfo, err := c.chainConn.client.GetBlockChainInfo() if err != nil { return nil, 0, err } @@ -144,7 +131,7 @@ func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) { // GetBlockHeight returns the height for the hash, if known, or returns an // error. func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { - header, err := c.GetBlockHeaderVerbose(hash) + header, err := c.chainConn.client.GetBlockHeaderVerbose(hash) if err != nil { return 0, err } @@ -153,49 +140,71 @@ func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { } // GetBlock returns a block from the hash. -func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, - error) { - return c.client.GetBlock(hash) +func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { + return c.chainConn.client.GetBlock(hash) } // GetBlockVerbose returns a verbose block from the hash. -func (c *BitcoindClient) GetBlockVerbose(hash *chainhash.Hash) ( - *btcjson.GetBlockVerboseResult, error) { - return c.client.GetBlockVerbose(hash) +func (c *BitcoindClient) GetBlockVerbose( + hash *chainhash.Hash) (*btcjson.GetBlockVerboseResult, error) { + + return c.chainConn.client.GetBlockVerbose(hash) } // GetBlockHash returns a block hash from the height. func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) { - return c.client.GetBlockHash(height) + return c.chainConn.client.GetBlockHash(height) } // GetBlockHeader returns a block header from the hash. func (c *BitcoindClient) GetBlockHeader( hash *chainhash.Hash) (*wire.BlockHeader, error) { - return c.client.GetBlockHeader(hash) + + return c.chainConn.client.GetBlockHeader(hash) } // GetBlockHeaderVerbose returns a block header from the hash. -func (c *BitcoindClient) GetBlockHeaderVerbose(hash *chainhash.Hash) ( - *btcjson.GetBlockHeaderVerboseResult, error) { - return c.client.GetBlockHeaderVerbose(hash) +func (c *BitcoindClient) GetBlockHeaderVerbose( + hash *chainhash.Hash) (*btcjson.GetBlockHeaderVerboseResult, error) { + + return c.chainConn.client.GetBlockHeaderVerbose(hash) } // GetRawTransactionVerbose returns a transaction from the tx hash. -func (c *BitcoindClient) GetRawTransactionVerbose(hash *chainhash.Hash) ( - *btcjson.TxRawResult, error) { - return c.client.GetRawTransactionVerbose(hash) +func (c *BitcoindClient) GetRawTransactionVerbose( + hash *chainhash.Hash) (*btcjson.TxRawResult, error) { + + return c.chainConn.client.GetRawTransactionVerbose(hash) } // GetTxOut returns a txout from the outpoint info provided. func (c *BitcoindClient) GetTxOut(txHash *chainhash.Hash, index uint32, mempool bool) (*btcjson.GetTxOutResult, error) { - return c.client.GetTxOut(txHash, index, mempool) + + return c.chainConn.client.GetTxOut(txHash, index, mempool) } -// NotifyReceived updates the watch list with the passed addresses. +// SendRawTransaction sends a raw transaction via bitcoind. +func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx, + allowHighFees bool) (*chainhash.Hash, error) { + + return c.chainConn.client.SendRawTransaction(tx, allowHighFees) +} + +// Notifications returns a channel to retrieve notifications from. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) Notifications() <-chan interface{} { + return c.notificationQueue.ChanOut() +} + +// NotifyReceived allows the chain backend to notify the caller whenever a +// transaction pays to any of the given addresses. +// +// NOTE: This is part of the chain.Interface interface. func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { c.NotifyBlocks() + select { case c.rescanUpdate <- addrs: case <-c.quit: @@ -205,9 +214,11 @@ func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { return nil } -// NotifySpent updates the watch list with the passed outPoints. +// NotifySpent allows the chain backend to notify the caller whenever a +// transaction spends any of the given outpoints. func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { c.NotifyBlocks() + select { case c.rescanUpdate <- outPoints: case <-c.quit: @@ -217,9 +228,11 @@ func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { return nil } -// NotifyTxIDs updates the watch list with the passed TxIDs. -func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error { +// NotifyTx allows the chain backend to notify the caller whenever any of the +// given transactions confirm within the chain. +func (c *BitcoindClient) NotifyTx(txids []chainhash.Hash) error { c.NotifyBlocks() + select { case c.rescanUpdate <- txids: case <-c.quit: @@ -229,25 +242,33 @@ func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error { return nil } -// NotifyBlocks enables notifications. +// NotifyBlocks allows the chain backend to notify the caller whenever a block +// is connected or disconnected. +// +// NOTE: This is part of the chain.Interface interface. func (c *BitcoindClient) NotifyBlocks() error { - atomic.StoreUint32(&c.notify, 1) + atomic.StoreUint32(&c.notifyBlocks, 1) return nil } -// notifying returns true if notifications have been turned on; false otherwise. -func (c *BitcoindClient) notifying() bool { - return (atomic.LoadUint32(&c.notify) == 1) +// shouldNotifyBlocks determines whether the client should send block +// notifications to the caller. +func (c *BitcoindClient) shouldNotifyBlocks() bool { + return atomic.LoadUint32(&c.notifyBlocks) == 1 } -// LoadTxFilter updates the transaction watchlists for the client. Acceptable -// arguments after `reset` are any combination of []btcutil.Address, -// []wire.OutPoint, []*wire.OutPoint, []chainhash.Hash, -// map[wire.OutPoint]btcutil.Address, and []*chainhash.Hash. -func (c *BitcoindClient) LoadTxFilter(reset bool, - watchLists ...interface{}) error { - - // If we reset, signal that. +// LoadTxFilter uses the given filters to what we should match transactions +// against to determine if they are relevant to the client. The reset argument +// is used to reset the current filters. +// +// The current filters supported are of the following types: +// []btcutil.Address +// []wire.OutPoint +// []*wire.OutPoint +// map[wire.OutPoint]btcutil.Address +// []chainhash.Hash +// []*chainhash.Hash +func (c *BitcoindClient) LoadTxFilter(reset bool, filters ...interface{}) error { if reset { select { case c.rescanUpdate <- struct{}{}: @@ -256,12 +277,9 @@ func (c *BitcoindClient) LoadTxFilter(reset bool, } } - // This helper function will send an update to the filter. If the quit - // channel is closed, it will allow the outer loop below to finish, - // but skip over any updates as the quit case is triggered each time. - sendList := func(list interface{}) error { + updateFilter := func(filter interface{}) error { select { - case c.rescanUpdate <- list: + case c.rescanUpdate <- filter: case <-c.quit: return ErrBitcoindClientShuttingDown } @@ -269,33 +287,23 @@ func (c *BitcoindClient) LoadTxFilter(reset bool, return nil } - var err error - for _, watchList := range watchLists { - switch list := watchList.(type) { - - case map[wire.OutPoint]btcutil.Address: - err = sendList(list) - - case []wire.OutPoint: - err = sendList(list) - - case []*wire.OutPoint: - err = sendList(list) - - case []btcutil.Address: - err = sendList(list) - - case []chainhash.Hash: - err = sendList(list) - - case []*chainhash.Hash: - err = sendList(list) + // In order to make this operation atomic, we'll iterate through the + // filters twice: the first to ensure there aren't any unsupported + // filter types, and the second to actually update our filters. + for _, filter := range filters { + switch filter := filter.(type) { + case []btcutil.Address, []wire.OutPoint, []*wire.OutPoint, + map[wire.OutPoint]btcutil.Address, []chainhash.Hash, + []*chainhash.Hash: + // Proceed to check the next filter type. default: - return fmt.Errorf("unable to update filter: unknown "+ - "type %T", list) + return fmt.Errorf("unsupported filter type %T", filter) } - if err != nil { + } + + for _, filter := range filters { + if err := updateFilter(filter); err != nil { return err } } @@ -305,8 +313,8 @@ func (c *BitcoindClient) LoadTxFilter(reset bool, // RescanBlocks rescans any blocks passed, returning only the blocks that // matched as []btcjson.BlockDetails. -func (c *BitcoindClient) RescanBlocks(blockHashes []chainhash.Hash) ( - []btcjson.RescannedBlock, error) { +func (c *BitcoindClient) RescanBlocks( + blockHashes []chainhash.Hash) ([]btcjson.RescannedBlock, error) { rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes)) for _, hash := range blockHashes { @@ -324,50 +332,51 @@ func (c *BitcoindClient) RescanBlocks(blockHashes []chainhash.Hash) ( continue } - relevantTxes, err := c.filterBlock(block, header.Height, false) - if len(relevantTxes) > 0 { + relevantTxs, err := c.filterBlock(block, header.Height, false) + if len(relevantTxs) > 0 { rescannedBlock := btcjson.RescannedBlock{ Hash: hash.String(), } - for _, tx := range relevantTxes { + for _, tx := range relevantTxs { rescannedBlock.Transactions = append( rescannedBlock.Transactions, hex.EncodeToString(tx.SerializedTx), ) } - rescannedBlocks = append(rescannedBlocks, - rescannedBlock) + + rescannedBlocks = append(rescannedBlocks, rescannedBlock) } } + return rescannedBlocks, nil } // Rescan rescans from the block with the given hash until the current block, // after adding the passed addresses and outpoints to the client's watch list. func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, - addrs []btcutil.Address, outPoints map[wire.OutPoint]btcutil.Address) error { + addresses []btcutil.Address, outPoints map[wire.OutPoint]btcutil.Address) error { + // A block hash is required to use as the starting point of the rescan. if blockHash == nil { return errors.New("rescan requires a starting block hash") } - // Update addresses. + // We'll then update our filters with the given outpoints and addresses. select { - case c.rescanUpdate <- addrs: + case c.rescanUpdate <- addresses: case <-c.quit: return ErrBitcoindClientShuttingDown } - // Update outpoints. select { case c.rescanUpdate <- outPoints: case <-c.quit: return ErrBitcoindClientShuttingDown } - // Kick off the rescan with the starting block hash. + // Once the filters have been updated, we can begin the rescan. select { - case c.rescanUpdate <- blockHash: + case c.rescanUpdate <- *blockHash: case <-c.quit: return ErrBitcoindClientShuttingDown } @@ -375,99 +384,214 @@ func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, return nil } -// SendRawTransaction sends a raw transaction via bitcoind. -func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx, - allowHighFees bool) (*chainhash.Hash, error) { - - return c.client.SendRawTransaction(tx, allowHighFees) -} - -// Start attempts to establish a client connection with the remote server. -// If successful, handler goroutines are started to process notifications -// sent by the server. After a limited number of connection attempts, this -// function gives up, and therefore will not block forever waiting for the -// connection to be established to a server that may not exist. +// Start initializes the bitcoind rescan client using the backing bitcoind +// connection and starts all goroutines necessary in order to process rescans +// and ZMQ notifications. +// +// NOTE: This is part of the chain.Interface interface. func (c *BitcoindClient) Start() error { if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { return nil } - // Verify that the server is running on the expected network. - net, err := c.GetCurrentNet() - if err != nil { - c.client.Disconnect() - return err - } - if net != c.chainParams.Net { - c.client.Disconnect() - return errors.New("mismatched networks") - } + // Start the notification queue and immediately dispatch a + // ClientConnected notification to the caller. This is needed as some of + // the callers will require this notification before proceeding. + c.notificationQueue.Start() + c.notificationQueue.ChanIn() <- ClientConnected{} - // Get initial conditions. + // Retrieve the best block of the chain. bestHash, bestHeight, err := c.GetBestBlock() if err != nil { - return err + return fmt.Errorf("unable to retrieve best block: %v", err) } bestHeader, err := c.GetBlockHeaderVerbose(bestHash) if err != nil { - return err + return fmt.Errorf("unable to retrieve header for best block: "+ + "%v", err) } + + c.bestBlockMtx.Lock() c.bestBlock = waddrmgr.BlockStamp{ - Height: bestHeight, Hash: *bestHash, + Height: bestHeight, Timestamp: time.Unix(bestHeader.Time, 0), } + c.bestBlockMtx.Unlock() - // Connect a ZMQ socket for block notifications - zmqClient, err := gozmq.Subscribe(c.zmqConnect, []string{"rawblock", - "rawtx"}, c.zmqPollInterval) - if err != nil { - return err - } + // Once the client has started successfully, we'll include it in the set + // of rescan clients of the backing bitcoind connection in order to + // received ZMQ event notifications. + c.chainConn.AddClient(c) c.wg.Add(2) - go c.socketHandler(zmqClient) + go c.rescanHandler() + go c.ntfnHandler() + return nil } -// Stop disconnects the client and signals the shutdown of all goroutines -// started by Start. +// Stop stops the bitcoind rescan client from processing rescans and ZMQ +// notifications. +// +// NOTE: This is part of the chain.Interface interface. func (c *BitcoindClient) Stop() { if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { return } close(c.quit) - c.client.Shutdown() + + // Remove this client's reference from the bitcoind connection to + // prevent sending notifications to it after it's been stopped. + c.chainConn.RemoveClient(c.id) + c.notificationQueue.Stop() } -// WaitForShutdown blocks until both the client has finished disconnecting -// and all handlers have exited. +// WaitForShutdown blocks until the client has finished disconnecting and all +// handlers have exited. +// +// NOTE: This is part of the chain.Interface interface. func (c *BitcoindClient) WaitForShutdown() { - c.client.WaitForShutdown() c.wg.Wait() } -// Notifications returns a channel of parsed notifications sent by the remote -// bitcoin RPC server. This channel must be continually read or the process -// may abort for running out memory, as unread notifications are queued for -// later reads. -func (c *BitcoindClient) Notifications() <-chan interface{} { - return c.notificationQueue.ChanOut() +// rescanHandler handles the logic needed for the caller to trigger a chain +// rescan. +// +// NOTE: This must be called as a goroutine. +func (c *BitcoindClient) rescanHandler() { + defer c.wg.Done() + + for { + select { + case update := <-c.rescanUpdate: + switch update := update.(type) { + + // We're clearing the filters. + case struct{}: + c.watchMtx.Lock() + c.watchedOutPoints = make(map[wire.OutPoint]struct{}) + c.watchedAddresses = make(map[string]struct{}) + c.watchedTxs = make(map[chainhash.Hash]struct{}) + c.watchMtx.Unlock() + + // We're adding the addresses to our filter. + case []btcutil.Address: + c.watchMtx.Lock() + for _, addr := range update { + c.watchedAddresses[addr.String()] = struct{}{} + } + c.watchMtx.Unlock() + + // We're adding the outpoints to our filter. + case []wire.OutPoint: + c.watchMtx.Lock() + for _, op := range update { + c.watchedOutPoints[op] = struct{}{} + } + c.watchMtx.Unlock() + case []*wire.OutPoint: + c.watchMtx.Lock() + for _, op := range update { + c.watchedOutPoints[*op] = struct{}{} + } + c.watchMtx.Unlock() + + // We're adding the outpoints that map to the scripts + // that we should scan for to our filter. + case map[wire.OutPoint]btcutil.Address: + c.watchMtx.Lock() + for op := range update { + c.watchedOutPoints[op] = struct{}{} + } + c.watchMtx.Unlock() + + // We're adding the transactions to our filter. + case []chainhash.Hash: + c.watchMtx.Lock() + for _, txid := range update { + c.watchedTxs[txid] = struct{}{} + } + c.watchMtx.Unlock() + case []*chainhash.Hash: + c.watchMtx.Lock() + for _, txid := range update { + c.watchedTxs[*txid] = struct{}{} + } + c.watchMtx.Unlock() + + // We're starting a rescan from the hash. + case chainhash.Hash: + if err := c.rescan(update); err != nil { + log.Errorf("Unable to complete chain "+ + "rescan: %v", err) + } + default: + log.Warnf("Received unexpected filter type %T", + update) + } + case <-c.quit: + return + } + } } -// SetStartTime is a non-interface method to set the birthday of the wallet -// using this object. Since only a single rescan at a time is currently -// supported, only one birthday needs to be set. This does not fully restart a -// running rescan, so should not be used to update a rescan while it is running. -// TODO: When factoring out to multiple rescans per bitcoind client, add a -// birthday per client. -func (c *BitcoindClient) SetStartTime(startTime time.Time) { - c.clientMtx.Lock() - defer c.clientMtx.Unlock() +// ntfnHandler handles the logic to retrieve ZMQ notifications from the backing +// bitcoind connection. +// +// NOTE: This must be called as a goroutine. +func (c *BitcoindClient) ntfnHandler() { + defer c.wg.Done() - c.startTime = startTime + for { + select { + case tx := <-c.zmqTxNtfns: + if _, _, err := c.filterTx(tx, nil, true); err != nil { + log.Error(err) + } + case newBlock := <-c.zmqBlockNtfns: + // If the new block's previous hash matches the best + // hash known to us, then the new block is the next + // successor, so we'll update our best block to reflect + // this and determine if this new block matches any of + // our existing filters. + c.bestBlockMtx.Lock() + bestBlock := c.bestBlock + c.bestBlockMtx.Unlock() + if newBlock.Header.PrevBlock == bestBlock.Hash { + newBlockHeight := bestBlock.Height + 1 + _, err := c.filterBlock( + newBlock, newBlockHeight, true, + ) + if err != nil { + log.Error(err) + continue + } + + // With the block succesfully filtered, we'll + // make it our new best block. + bestBlock.Hash = newBlock.BlockHash() + bestBlock.Height = newBlockHeight + bestBlock.Timestamp = newBlock.Header.Timestamp + + c.bestBlockMtx.Lock() + c.bestBlock = bestBlock + c.bestBlockMtx.Unlock() + + continue + } + + // Otherwise, we've encountered a reorg. + if err := c.reorg(bestBlock, newBlock); err != nil { + log.Errorf("Unable to process chain reorg: %v", + err) + } + case <-c.quit: + return + } + } } // BlockStamp returns the latest block notified by the client, or an error @@ -480,31 +604,34 @@ func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { return &bestBlock, nil } -func (c *BitcoindClient) onClientConnect() { - select { - case c.notificationQueue.ChanIn() <- ClientConnected{}: - case <-c.quit: - } -} +// onBlockConnected is a callback that's executed whenever a new block has been +// detected. This will queue a BlockConnected notification to the caller. +func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, + timestamp time.Time) { -func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) { - if c.notifying() { + if c.shouldNotifyBlocks() { select { case c.notificationQueue.ChanIn() <- BlockConnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, }, - Time: time, + Time: timestamp, }: case <-c.quit: } } } +// onFilteredBlockConnected is an alternative callback that's executed whenever +// a new block has been detected. It serves the same purpose as +// onBlockConnected, but it also includes a list of the relevant transactions +// found within the block being connected. This will queue a +// FilteredBlockConnected notification to the caller. func (c *BitcoindClient) onFilteredBlockConnected(height int32, header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { - if c.notifying() { + + if c.shouldNotifyBlocks() { select { case c.notificationQueue.ChanIn() <- FilteredBlockConnected{ Block: &wtxmgr.BlockMeta{ @@ -521,296 +648,201 @@ func (c *BitcoindClient) onFilteredBlockConnected(height int32, } } -func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) { - if c.notifying() { +// onBlockDisconnected is a callback that's executed whenever a block has been +// disconnected. This will queue a BlockDisconnected notification to the caller +// with the details of the block being disconnected. +func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, + timestamp time.Time) { + + if c.shouldNotifyBlocks() { select { case c.notificationQueue.ChanIn() <- BlockDisconnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, }, - Time: time, + Time: timestamp, }: case <-c.quit: } } } -func (c *BitcoindClient) onRelevantTx(rec *wtxmgr.TxRecord, - block *btcjson.BlockDetails) { - blk, err := parseBlock(block) +// onRelevantTx is a callback that's executed whenever a transaction is relevant +// to the caller. This means that the transaction matched a specific item in the +// client's different filters. This will queue a RelevantTx notification to the +// caller. +func (c *BitcoindClient) onRelevantTx(tx *wtxmgr.TxRecord, + blockDetails *btcjson.BlockDetails) { + + block, err := parseBlock(blockDetails) if err != nil { - // Log and drop improper notification. - log.Errorf("recvtx notification bad block: %v", err) + log.Errorf("Unable to send onRelevantTx notification, failed "+ + "parse block: %v", err) return } select { - case c.notificationQueue.ChanIn() <- RelevantTx{rec, blk}: + case c.notificationQueue.ChanIn() <- RelevantTx{ + TxRecord: tx, + Block: block, + }: case <-c.quit: } } -func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) { +// onRescanProgress is a callback that's executed whenever a rescan is in +// progress. This will queue a RescanProgress notification to the caller with +// the current rescan progress details. +func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, + timestamp time.Time) { + select { - case c.notificationQueue.ChanIn() <- &RescanProgress{hash, height, blkTime}: + case c.notificationQueue.ChanIn() <- &RescanProgress{ + Hash: hash, + Height: height, + Time: timestamp, + }: case <-c.quit: } } -func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) { +// onRescanFinished is a callback that's executed whenever a rescan has +// finished. This will queue a RescanFinished notification to the caller with +// the details of the last block in the range of the rescan. +func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, + timestamp time.Time) { + log.Infof("Rescan finished at %d (%s)", height, hash) + select { - case c.notificationQueue.ChanIn() <- &RescanFinished{hash, height, blkTime}: + case c.notificationQueue.ChanIn() <- &RescanFinished{ + Hash: hash, + Height: height, + Time: timestamp, + }: case <-c.quit: } - -} - -// socketHandler reads events from the ZMQ socket, processes them as -// appropriate, and queues them as btcd or neutrino would. -func (c *BitcoindClient) socketHandler(zmqClient *gozmq.Conn) { - defer c.wg.Done() - defer zmqClient.Close() - - log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect) - c.onClientConnect() - -mainLoop: - for { - selectLoop: - for { - // Check for any requests before we poll events from - // bitcoind. - select { - - // Quit if requested - case <-c.quit: - return - - // Update our monitored watchlists or do a rescan. - case event := <-c.rescanUpdate: - switch e := event.(type) { - - // We're clearing the watchlists. - case struct{}: - c.clientMtx.Lock() - c.watchAddrs = make(map[string]struct{}) - c.watchTxIDs = make(map[chainhash.Hash]struct{}) - c.watchOutPoints = - make(map[wire.OutPoint]struct{}) - c.clientMtx.Unlock() - - // We're updating monitored addresses. - case []btcutil.Address: - c.clientMtx.Lock() - for _, addr := range e { - c.watchAddrs[addr.EncodeAddress()] = - struct{}{} - } - c.clientMtx.Unlock() - - // We're updating monitored outpoints from - // pointers. - case []*wire.OutPoint: - c.clientMtx.Lock() - for _, op := range e { - c.watchOutPoints[*op] = struct{}{} - } - c.clientMtx.Unlock() - case []wire.OutPoint: - c.clientMtx.Lock() - for _, op := range e { - c.watchOutPoints[op] = struct{}{} - } - c.clientMtx.Unlock() - - // We're updating monitored outpoints that map - // to the scripts that we should scan for. - case map[wire.OutPoint]btcutil.Address: - c.clientMtx.Lock() - for op := range e { - c.watchOutPoints[op] = struct{}{} - } - c.clientMtx.Unlock() - - // We're adding monitored TXIDs. - case []*chainhash.Hash: - c.clientMtx.Lock() - for _, txid := range e { - c.watchTxIDs[*txid] = struct{}{} - } - c.clientMtx.Unlock() - case []chainhash.Hash: - c.clientMtx.Lock() - for _, txid := range e { - c.watchTxIDs[txid] = struct{}{} - } - c.clientMtx.Unlock() - - // We're rescanning from the passed hash. - case *chainhash.Hash: - if err := c.rescan(e); err != nil { - log.Errorf("rescan failed: %s", - err) - } - } - default: - break selectLoop - } - } - - // Now, poll events from bitcoind. - msgBytes, err := zmqClient.Receive() - if err != nil { - switch e := err.(type) { - case net.Error: - if !e.Timeout() { - log.Error(err) - } - default: - log.Error(err) - } - continue mainLoop - } - - // We have an event! - switch string(msgBytes[0]) { - - // We have a transaction, so process it. - case "rawtx": - tx := &wire.MsgTx{} - err = tx.Deserialize(bytes.NewBuffer(msgBytes[1])) - if err != nil { - log.Error(err) - continue mainLoop - } - // filterTx automatically detects whether this tx has - // been mined and responds appropriately. - _, _, err := c.filterTx(tx, nil, true) - if err != nil { - log.Error(err) - } - - // We have a raw block, so we process it. - case "rawblock": - block := &wire.MsgBlock{} - err = block.Deserialize(bytes.NewBuffer(msgBytes[1])) - if err != nil { - log.Error(err) - continue mainLoop - } - - // Check if the block is logically next. If not, we - // have a reorg. - c.bestBlockMtx.Lock() - bestBlock := c.bestBlock - if block.Header.PrevBlock == bestBlock.Hash { - // No reorg. Notify the subscriber of the block. - bestBlock.Hash = block.BlockHash() - bestBlock.Height++ - bestBlock.Timestamp = block.Header.Timestamp - _, err = c.filterBlock(block, bestBlock.Height, true) - if err != nil { - log.Error(err) - } - c.bestBlockMtx.Unlock() - continue mainLoop - } - c.bestBlockMtx.Unlock() - - // We have a reorg. - err = c.reorg(bestBlock, block) - if err != nil { - log.Errorf("Error during reorg: %v", err) - } - - // Our event is not a block or other type we're - // watching, so we ignore it. - default: - } - } } // reorg processes a reorganization during chain synchronization. This is -// separate from a rescan's handling of a reorg. -func (c *BitcoindClient) reorg(bs waddrmgr.BlockStamp, block *wire.MsgBlock) error { - // We rewind until we find a common ancestor between the known chain - //and the current chain, and then fast forward again. This relies on - // being able to fetch both from bitcoind; to change that would require - // changes in downstream code. - // TODO: Make this more robust in order not to rely on this behavior. - log.Debugf("Possible reorg at block %s", block.BlockHash()) - knownHeader, err := c.GetBlockHeader(&bs.Hash) - if err != nil { - return err - } +// separate from a rescan's handling of a reorg. This will rewind back until it +// finds a common ancestor and notify all the new blocks since then. +func (c *BitcoindClient) reorg(currentBlock waddrmgr.BlockStamp, + reorgBlock *wire.MsgBlock) error { - // We also get the best known height based on the block which was - // notified. This way, we can preserve the chain of blocks we need to + log.Debugf("Possible reorg at block %s", reorgBlock.BlockHash()) + + // Retrieve the best known height based on the block which caused the + // reorg. This way, we can preserve the chain of blocks we need to // retrieve. - bestHash := block.BlockHash() + bestHash := reorgBlock.BlockHash() bestHeight, err := c.GetBlockHeight(&bestHash) if err != nil { return err } - if bestHeight < bs.Height { - log.Debug("multiple reorgs in a row") + + if bestHeight < currentBlock.Height { + log.Debug("Detected multiple reorgs") return nil } - // We track the block headers from the notified block to the current - // block at the known block height. This will let us fast-forward - // despite any future reorgs. - var reorgBlocks list.List - reorgBlocks.PushFront(block) - for i := bestHeight - 1; i >= bs.Height; i-- { - block, err = c.GetBlock(&block.Header.PrevBlock) + // We'll now keep track of all the blocks known to the *chain*, starting + // from the best block known to us until the best block in the chain. + // This will let us fast-forward despite any future reorgs. + blocksToNotify := list.New() + blocksToNotify.PushFront(reorgBlock) + previousBlock := reorgBlock.Header.PrevBlock + for i := bestHeight - 1; i >= currentBlock.Height; i-- { + block, err := c.GetBlock(&previousBlock) if err != nil { return err } - reorgBlocks.PushFront(block) + blocksToNotify.PushFront(block) + previousBlock = block.Header.PrevBlock } - // Now we rewind back to the last common ancestor block, using the - // prevblock hash from each header to avoid any race conditions. If we - // get more reorgs, they'll be queued and we'll repeat the cycle. - for block.Header.PrevBlock != knownHeader.PrevBlock { - log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash) - c.onBlockDisconnected(&bs.Hash, bs.Height, - knownHeader.Timestamp) - bs.Height-- - bs.Hash = knownHeader.PrevBlock - block, err = c.GetBlock(&block.Header.PrevBlock) - if err != nil { - return err - } - reorgBlocks.PushFront(block) - knownHeader, err = c.GetBlockHeader(&knownHeader.PrevBlock) - if err != nil { - return err - } - bs.Timestamp = knownHeader.Timestamp + // Rewind back to the last common ancestor block using the previous + // block hash from each header to avoid any race conditions. If we + // encounter more reorgs, they'll be queued and we'll repeat the cycle. + // + // We'll start by retrieving the header to the best block known to us. + currentHeader, err := c.GetBlockHeader(¤tBlock.Hash) + if err != nil { + return err } - // Disconnect the last block from the old chain. Since the PrevBlock is - // equal between the old and new chains, the tip will now be the last - // common ancestor. - log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash) - c.onBlockDisconnected(&bs.Hash, bs.Height, knownHeader.Timestamp) - bs.Height-- + // Then, we'll walk backwards in the chain until we find our common + // ancestor. + for previousBlock != currentHeader.PrevBlock { + // Since the previous hashes don't match, the current block has + // been reorged out of the chain, so we should send a + // BlockDisconnected notification for it. + log.Debugf("Disconnecting block %d (%v)", currentBlock.Height, + currentBlock.Hash) - // Now we fast-forward to the notified block, notifying along the way. - for reorgBlocks.Front() != nil { - block = reorgBlocks.Front().Value.(*wire.MsgBlock) - bs.Height++ - bs.Hash = block.BlockHash() - c.filterBlock(block, bs.Height, true) - reorgBlocks.Remove(reorgBlocks.Front()) + c.onBlockDisconnected( + ¤tBlock.Hash, currentBlock.Height, + currentBlock.Timestamp, + ) + + // Our current block should now reflect the previous one to + // continue the common ancestor search. + currentHeader, err = c.GetBlockHeader(¤tHeader.PrevBlock) + if err != nil { + return err + } + + currentBlock.Height-- + currentBlock.Hash = currentHeader.PrevBlock + currentBlock.Timestamp = currentHeader.Timestamp + + // Store the correct block in our list in order to notify it + // once we've found our common ancestor. + block, err := c.GetBlock(&previousBlock) + if err != nil { + return err + } + blocksToNotify.PushFront(block) + previousBlock = block.Header.PrevBlock + } + + // Disconnect the last block from the old chain. Since the previous + // block remains the same between the old and new chains, the tip will + // now be the last common ancestor. + log.Debugf("Disconnecting block %d (%v)", currentBlock.Height, + currentBlock.Hash) + + c.onBlockDisconnected( + ¤tBlock.Hash, currentBlock.Height, currentHeader.Timestamp, + ) + + currentBlock.Height-- + + // Now we fast-forward to the new block, notifying along the way. + for blocksToNotify.Front() != nil { + nextBlock := blocksToNotify.Front().Value.(*wire.MsgBlock) + nextHeight := currentBlock.Height + 1 + nextHash := nextBlock.BlockHash() + nextHeader, err := c.GetBlockHeader(&nextHash) + if err != nil { + return err + } + + _, err = c.filterBlock(nextBlock, nextHeight, true) + if err != nil { + return err + } + + currentBlock.Height = nextHeight + currentBlock.Hash = nextHash + currentBlock.Timestamp = nextHeader.Timestamp + + blocksToNotify.Remove(blocksToNotify.Front()) } c.bestBlockMtx.Lock() - c.bestBlock = bs + c.bestBlock = currentBlock c.bestBlockMtx.Unlock() return nil @@ -821,6 +853,8 @@ func (c *BitcoindClient) reorg(bs waddrmgr.BlockStamp, block *wire.MsgBlock) err // returning a FilterBlocksReponse for the first block containing a matching // address. If no matches are found in the range of blocks requested, the // returned response will be nil. +// +// NOTE: This is part of the chain.Interface interface. func (c *BitcoindClient) FilterBlocks( req *FilterBlocksRequest) (*FilterBlocksResponse, error) { @@ -832,7 +866,7 @@ func (c *BitcoindClient) FilterBlocks( for i, block := range req.Blocks { // TODO(conner): add prefetching, since we already know we'll be // fetching *every* block - rawBlock, err := c.client.GetBlock(&block.Hash) + rawBlock, err := c.GetBlock(&block.Hash) if err != nil { return nil, err } @@ -862,17 +896,18 @@ func (c *BitcoindClient) FilterBlocks( return nil, nil } -// rescan performs a rescan of the chain using a bitcoind back-end, from the -// specified hash to the best-known hash, while watching out for reorgs that -// happen during the rescan. It uses the addresses and outputs being tracked -// by the client in the watch list. This is called only within a queue -// processing loop. -func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { - // We start by getting the best already-processed block. We only use +// rescan performs a rescan of the chain using a bitcoind backend, from the +// specified hash to the best known hash, while watching out for reorgs that +// happen during the rescan. It uses the addresses and outputs being tracked by +// the client in the watch list. This is called only within a queue processing +// loop. +func (c *BitcoindClient) rescan(start chainhash.Hash) error { + log.Infof("Starting rescan from block %s", start) + + // We start by getting the best already processed block. We only use // the height, as the hash can change during a reorganization, which we // catch by testing connectivity from known blocks to the previous // block. - log.Infof("Starting rescan from block %s", hash) bestHash, bestHeight, err := c.GetBestBlock() if err != nil { return err @@ -881,52 +916,47 @@ func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { if err != nil { return err } - bestBlock := &waddrmgr.BlockStamp{ + bestBlock := waddrmgr.BlockStamp{ Hash: *bestHash, Height: bestHeight, Timestamp: time.Unix(bestHeader.Time, 0), } - lastHeader, err := c.GetBlockHeaderVerbose(hash) - if err != nil { - return err - } - lastHash, err := chainhash.NewHashFromStr(lastHeader.Hash) - if err != nil { - return err - } - firstHeader := lastHeader + // Create a list of headers sorted in forward order. We'll use this in + // the event that we need to backtrack due to a chain reorg. headers := list.New() - headers.PushBack(lastHeader) + previousHeader, err := c.GetBlockHeaderVerbose(&start) + if err != nil { + return err + } + previousHash, err := chainhash.NewHashFromStr(previousHeader.Hash) + if err != nil { + return err + } + headers.PushBack(previousHeader) - // We always send a RescanFinished message when we're done. - defer func() { - c.onRescanFinished(lastHash, lastHeader.Height, time.Unix( - lastHeader.Time, 0)) - }() + // Queue a RescanFinished notification to the caller with the last block + // processed throughout the rescan once done. + defer c.onRescanFinished( + previousHash, previousHeader.Height, + time.Unix(previousHeader.Time, 0), + ) // Cycle through all of the blocks known to bitcoind, being mindful of // reorgs. - for i := firstHeader.Height + 1; i <= bestBlock.Height; i++ { - // Get the block at the current height. + for i := previousHeader.Height + 1; i <= bestBlock.Height; i++ { hash, err := c.GetBlockHash(int64(i)) if err != nil { return err } - // This relies on the fact that bitcoind returns blocks from - // non-best chains it knows about. - // TODO: Make this more robust in order to not rely on this - // behavior. - // - // If the last known header isn't after the wallet birthday, - // try only fetching the next header and constructing a dummy - // block. If, in this event, the next header's timestamp is - // after the wallet birthday, go ahead and fetch the full block. + // If the previous header is before the wallet birthday, fetch + // the current header and construct a dummy block, rather than + // fetching the whole block itself. This speeds things up as we + // no longer have to fetch the whole block when we know it won't + // match any of our filters. var block *wire.MsgBlock - c.clientMtx.RLock() - afterBirthday := lastHeader.Time >= c.startTime.Unix() - c.clientMtx.RUnlock() + afterBirthday := previousHeader.Time >= c.birthday.Unix() if !afterBirthday { header, err := c.GetBlockHeader(hash) if err != nil { @@ -935,13 +965,14 @@ func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { block = &wire.MsgBlock{ Header: *header, } - c.clientMtx.RLock() - afterBirthday = c.startTime.Before(header.Timestamp) + + afterBirthday = c.birthday.Before(header.Timestamp) if afterBirthday { - c.onRescanProgress(lastHash, i, - block.Header.Timestamp) + c.onRescanProgress( + previousHash, i, + block.Header.Timestamp, + ) } - c.clientMtx.RUnlock() } if afterBirthday { @@ -951,81 +982,88 @@ func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { } } - for block.Header.PrevBlock.String() != lastHeader.Hash { + for block.Header.PrevBlock.String() != previousHeader.Hash { // If we're in this for loop, it looks like we've been // reorganized. We now walk backwards to the common // ancestor between the best chain and the known chain. // // First, we signal a disconnected block to rewind the // rescan state. - c.onBlockDisconnected(lastHash, lastHeader.Height, - time.Unix(lastHeader.Time, 0)) + c.onBlockDisconnected( + previousHash, previousHeader.Height, + time.Unix(previousHeader.Time, 0), + ) - // Next, we get the previous block of the best chain. - hash, err = c.GetBlockHash(int64(i - 1)) + // Get the previous block of the best chain. + hash, err := c.GetBlockHash(int64(i - 1)) if err != nil { return err } - block, err = c.GetBlock(hash) if err != nil { return err } - // Then, we get the previous header for the known chain. + // Then, we'll the get the header of this previous + // block. if headers.Back() != nil { // If it's already in the headers list, we can // just get it from there and remove the - // current hash). + // current hash. headers.Remove(headers.Back()) if headers.Back() != nil { - lastHeader = headers.Back(). - Value.(*btcjson. - GetBlockHeaderVerboseResult) - lastHash, err = chainhash. - NewHashFromStr(lastHeader.Hash) + previousHeader = headers.Back(). + Value.(*btcjson.GetBlockHeaderVerboseResult) + previousHash, err = chainhash.NewHashFromStr( + previousHeader.Hash, + ) if err != nil { return err } } } else { // Otherwise, we get it from bitcoind. - lastHash, err = chainhash.NewHashFromStr( - lastHeader.PreviousHash) + previousHash, err = chainhash.NewHashFromStr( + previousHeader.PreviousHash, + ) if err != nil { return err } - lastHeader, err = c.GetBlockHeaderVerbose( - lastHash) + previousHeader, err = c.GetBlockHeaderVerbose( + previousHash, + ) if err != nil { return err } } } - // We are at the latest known block, so we notify. - lastHeader = &btcjson.GetBlockHeaderVerboseResult{ - Hash: block.BlockHash().String(), + // Now that we've ensured we haven't come across a reorg, we'll + // add the current block header to our list of headers. + blockHash := block.BlockHash() + previousHash = &blockHash + previousHeader = &btcjson.GetBlockHeaderVerboseResult{ + Hash: blockHash.String(), Height: i, PreviousHash: block.Header.PrevBlock.String(), Time: block.Header.Timestamp.Unix(), } - blockHash := block.BlockHash() - lastHash = &blockHash - headers.PushBack(lastHeader) + headers.PushBack(previousHeader) - _, err = c.filterBlock(block, i, true) - if err != nil { + // Notify the block and any of its relevant transacations. + if _, err = c.filterBlock(block, i, true); err != nil { return err } if i%10000 == 0 { - c.onRescanProgress(lastHash, i, block.Header.Timestamp) + c.onRescanProgress( + previousHash, i, block.Header.Timestamp, + ) } - // If we've reached the previously best-known block, check to + // If we've reached the previously best known block, check to // make sure the underlying node hasn't synchronized additional - // blocks. If it has, update the best-known block and continue + // blocks. If it has, update the best known block and continue // to rescan to that point. if i == bestBlock.Height { bestHash, bestHeight, err = c.GetBestBlock() @@ -1036,11 +1074,10 @@ func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { if err != nil { return err } - bestBlock = &waddrmgr.BlockStamp{ - Hash: *bestHash, - Height: bestHeight, - Timestamp: time.Unix(bestHeader.Time, 0), - } + + bestBlock.Hash = *bestHash + bestBlock.Height = bestHeight + bestBlock.Timestamp = time.Unix(bestHeader.Time, 0) } } @@ -1051,22 +1088,20 @@ func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { // any matching transactions, sending notifications along the way. func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32, notify bool) ([]*wtxmgr.TxRecord, error) { - // If we're earlier than wallet birthday, don't do any notifications. - c.clientMtx.RLock() - startTime := c.startTime - c.clientMtx.RUnlock() - if block.Header.Timestamp.Before(startTime) { + + // If this block happened before the client's birthday, then we'll skip + // it entirely. + if block.Header.Timestamp.Before(c.birthday) { return nil, nil } - // Only mention that we're filtering a block if the client wallet has - // started monitoring the chain. - if !c.notifying() { + if c.shouldNotifyBlocks() { log.Debugf("Filtering block %d (%s) with %d transactions", height, block.BlockHash(), len(block.Transactions)) } - // Create block details for notifications. + // Create a block details template to use for all of the confirmed + // transactions found within this block. blockHash := block.BlockHash() blockDetails := &btcjson.BlockDetails{ Hash: blockHash.String(), @@ -1074,35 +1109,39 @@ func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32, Time: block.Header.Timestamp.Unix(), } - // Cycle through all transactions in the block. + // Now, we'll through all of the transactions in the block keeping track + // of any relevant to the caller. var relevantTxs []*wtxmgr.TxRecord - blockConfirmed := make(map[chainhash.Hash]struct{}) + confirmedTxs := make(map[chainhash.Hash]struct{}) for i, tx := range block.Transactions { - // Update block and tx details for notifications. + // Update the index in the block details with the index of this + // transaction. blockDetails.Index = i - found, rec, err := c.filterTx(tx, blockDetails, notify) + isRelevant, rec, err := c.filterTx(tx, blockDetails, notify) if err != nil { - log.Warnf("Unable to filter tx: %v", err) + log.Warnf("Unable to filter transaction %v: %v", + tx.TxHash(), err) continue } - if found { + + if isRelevant { relevantTxs = append(relevantTxs, rec) - blockConfirmed[tx.TxHash()] = struct{}{} + confirmedTxs[tx.TxHash()] = struct{}{} } } // Update the expiration map by setting the block's confirmed // transactions and deleting any in the mempool that were confirmed // over 288 blocks ago. - c.clientMtx.Lock() - c.memPoolExp[height] = blockConfirmed - if oldBlock, ok := c.memPoolExp[height-288]; ok { + c.watchMtx.Lock() + c.expiredMempool[height] = confirmedTxs + if oldBlock, ok := c.expiredMempool[height-288]; ok { for txHash := range oldBlock { - delete(c.memPool, txHash) + delete(c.mempool, txHash) } - delete(c.memPoolExp, height-288) + delete(c.expiredMempool, height-288) } - c.clientMtx.Unlock() + c.watchMtx.Unlock() if notify { c.onFilteredBlockConnected(height, &block.Header, relevantTxs) @@ -1112,10 +1151,11 @@ func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32, return relevantTxs, nil } -// filterTx filters a single transaction against the client's watch list. +// filterTx determines whether a transaction is relevant to the client by +// inspecting the client's different filters. func (c *BitcoindClient) filterTx(tx *wire.MsgTx, - blockDetails *btcjson.BlockDetails, notify bool) (bool, - *wtxmgr.TxRecord, error) { + blockDetails *btcjson.BlockDetails, + notify bool) (bool, *wtxmgr.TxRecord, error) { txDetails := btcutil.NewTx(tx) if blockDetails != nil { @@ -1132,68 +1172,75 @@ func (c *BitcoindClient) filterTx(tx *wire.MsgTx, rec.Received = time.Unix(blockDetails.Time, 0) } - var notifyTx bool + // We'll begin the filtering process by holding the lock to ensure we + // match exactly against what's currently in the filters. + c.watchMtx.Lock() + defer c.watchMtx.Unlock() - // If we already know this is a relevant tx from a previous ntfn, we - // can shortcut the filter process and let the caller know the filter - // matches. - c.clientMtx.RLock() - if _, ok := c.memPool[tx.TxHash()]; ok { - c.clientMtx.RUnlock() + // If we've already seen this transaction and it's now been confirmed, + // then we'll shortcut the filter process by immediately sending a + // notification to the caller that the filter matches. + if _, ok := c.mempool[tx.TxHash()]; ok { if notify && blockDetails != nil { c.onRelevantTx(rec, blockDetails) } return true, rec, nil } - c.clientMtx.RUnlock() - // Cycle through outputs and check if we've matched a known address. - // Add any matched outpoints to watchOutPoints. + // Otherwise, this is a new transaction we have yet to see. We'll need + // to determine if this transaction is somehow relevant to the caller. + var isRelevant bool + + // We'll start by cycling through its outputs to determine if it pays to + // any of the currently watched addresses. If an output matches, we'll + // add it to our watch list. for i, out := range tx.TxOut { _, addrs, _, err := txscript.ExtractPkScriptAddrs( - out.PkScript, c.chainParams) + out.PkScript, c.chainParams, + ) if err != nil { - log.Debugf("Couldn't parse output script in %s:%d: %v", + log.Debugf("Unable to parse output script in %s:%d: %v", tx.TxHash(), i, err) continue } + for _, addr := range addrs { - c.clientMtx.RLock() - if _, ok := c.watchAddrs[addr.EncodeAddress()]; ok { - notifyTx = true - c.watchOutPoints[wire.OutPoint{ + if _, ok := c.watchedAddresses[addr.String()]; ok { + isRelevant = true + op := wire.OutPoint{ Hash: tx.TxHash(), Index: uint32(i), - }] = struct{}{} + } + c.watchedOutPoints[op] = struct{}{} } - c.clientMtx.RUnlock() } } - // If an output hasn't already matched, see if an input will. - if !notifyTx { + // If the transaction didn't pay to any of our watched addresses, we'll + // check if it spends any of our watched outpoints. + if !isRelevant { for _, in := range tx.TxIn { - c.clientMtx.RLock() - if _, ok := c.watchOutPoints[in.PreviousOutPoint]; ok { - c.clientMtx.RUnlock() - notifyTx = true + if _, ok := c.watchedOutPoints[in.PreviousOutPoint]; ok { + isRelevant = true break } - c.clientMtx.RUnlock() } } - // If we have a match and it's not mined, notify the TX. If the TX is - // mined, we notify as part of FilteredBlockConnected. The boolean map - // value will let us know if we last saw it as mined or unmined. - if notifyTx { - c.clientMtx.Lock() - if _, ok := c.memPool[tx.TxHash()]; blockDetails == nil || !ok { - c.onRelevantTx(rec, blockDetails) - } - c.memPool[tx.TxHash()] = struct{}{} - c.clientMtx.Unlock() + // If the transaction is not relevant to us, we can simply exit. + if !isRelevant { + return false, rec, nil } - return notifyTx, rec, nil + // Otherwise, the transaction matched our filters, so we should dispatch + // a notification for it. If it's still unconfirmed, we'll include it in + // our mempool so that it can also be notified as part of + // FilteredBlockConnected once it confirms. + if blockDetails == nil { + c.mempool[tx.TxHash()] = struct{}{} + } + + c.onRelevantTx(rec, blockDetails) + + return true, rec, nil } diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go new file mode 100644 index 0000000..963afe3 --- /dev/null +++ b/chain/bitcoind_conn.go @@ -0,0 +1,362 @@ +package chain + +import ( + "bytes" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/rpcclient" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/gozmq" +) + +// BitcoindConn represents a persistent client connection to a bitcoind node +// that listens for events read from a ZMQ connection. +type BitcoindConn struct { + started int32 // To be used atomically. + stopped int32 // To be used atomically. + + // rescanClientCounter is an atomic counter that assigns a unique ID to + // each new bitcoind rescan client using the current bitcoind + // connection. + rescanClientCounter uint64 + + // chainParams identifies the current network the bitcoind node is + // running on. + chainParams *chaincfg.Params + + // client is the RPC client to the bitcoind node. + client *rpcclient.Client + + // zmqBlockHost is the host listening for ZMQ connections that will be + // responsible for delivering raw transaction events. + zmqBlockHost string + + // zmqTxHost is the host listening for ZMQ connections that will be + // responsible for delivering raw transaction events. + zmqTxHost string + + // zmqPollInterval is the interval at which we'll attempt to retrieve an + // event from the ZMQ connection. + zmqPollInterval time.Duration + + // rescanClients is the set of active bitcoind rescan clients to which + // ZMQ event notfications will be sent to. + rescanClientsMtx sync.Mutex + rescanClients map[uint64]*BitcoindClient + + quit chan struct{} + wg sync.WaitGroup +} + +// NewBitcoindConn creates a client connection to the node described by the host +// string. The connection is not established immediately, but must be done using +// the Start method. If the remote node does not operate on the same bitcoin +// network as described by the passed chain parameters, the connection will be +// disconnected. +func NewBitcoindConn(chainParams *chaincfg.Params, + host, user, pass, zmqBlockHost, zmqTxHost string, + zmqPollInterval time.Duration) (*BitcoindConn, error) { + + clientCfg := &rpcclient.ConnConfig{ + Host: host, + User: user, + Pass: pass, + DisableAutoReconnect: false, + DisableConnectOnNew: true, + DisableTLS: true, + HTTPPostMode: true, + } + + client, err := rpcclient.New(clientCfg, nil) + if err != nil { + return nil, err + } + + conn := &BitcoindConn{ + chainParams: chainParams, + client: client, + zmqBlockHost: zmqBlockHost, + zmqTxHost: zmqTxHost, + zmqPollInterval: zmqPollInterval, + rescanClients: make(map[uint64]*BitcoindClient), + quit: make(chan struct{}), + } + + return conn, nil +} + +// Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If +// successful, a goroutine is spawned to read events from the ZMQ connection. +// It's possible for this function to fail due to a limited number of connection +// attempts. This is done to prevent waiting forever on the connection to be +// established in the case that the node is down. +func (c *BitcoindConn) Start() error { + if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { + return nil + } + + // Verify that the node is running on the expected network. + net, err := c.getCurrentNet() + if err != nil { + c.client.Disconnect() + return err + } + if net != c.chainParams.Net { + c.client.Disconnect() + return fmt.Errorf("expected network %v, got %v", + c.chainParams.Net, net) + } + + // Establish two different ZMQ connections to bitcoind to retrieve block + // and transaction event notifications. We'll use two as a separation of + // concern to ensure one type of event isn't dropped from the connection + // queue due to another type of event filling it up. + zmqBlockConn, err := gozmq.Subscribe( + c.zmqBlockHost, []string{"rawblock"}, c.zmqPollInterval, + ) + if err != nil { + c.client.Disconnect() + return fmt.Errorf("unable to subscribe for zmq block events: "+ + "%v", err) + } + + zmqTxConn, err := gozmq.Subscribe( + c.zmqTxHost, []string{"rawtx"}, c.zmqPollInterval, + ) + if err != nil { + c.client.Disconnect() + return fmt.Errorf("unable to subscribe for zmq tx events: %v", + err) + } + + c.wg.Add(2) + go c.blockEventHandler(zmqBlockConn) + go c.txEventHandler(zmqTxConn) + + return nil +} + +// Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any +// active rescan clients. +func (c *BitcoindConn) Stop() { + if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { + return + } + + for _, client := range c.rescanClients { + client.Stop() + } + + close(c.quit) + c.client.Shutdown() + + c.client.WaitForShutdown() + c.wg.Wait() +} + +// blockEventHandler reads raw blocks events from the ZMQ block socket and +// forwards them along to the current rescan clients. +// +// NOTE: This must be run as a goroutine. +func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { + defer c.wg.Done() + defer conn.Close() + + log.Info("Started listening for bitcoind block notifications via ZMQ ", + "on", c.zmqBlockHost) + + for { + // Before attempting to read from the ZMQ socket, we'll make + // sure to check if we've been requested to shut down. + select { + case <-c.quit: + return + default: + } + + // Poll an event from the ZMQ socket. It's possible that the + // connection to the socket continuously times out, so we'll + // prevent logging this error to prevent spamming the logs. + msgBytes, err := conn.Receive() + if err != nil { + err, ok := err.(net.Error) + if !ok || !err.Timeout() { + log.Error(err) + } + + continue + } + + // We have an event! We'll now ensure it is a block event, + // deserialize it, and report it to the different rescan + // clients. + eventType := string(msgBytes[0]) + switch eventType { + case "rawblock": + block := &wire.MsgBlock{} + r := bytes.NewReader(msgBytes[1]) + if err := block.Deserialize(r); err != nil { + log.Errorf("Unable to deserialize block: %v", + err) + continue + } + + c.rescanClientsMtx.Lock() + for _, client := range c.rescanClients { + select { + case client.zmqBlockNtfns <- block: + case <-client.quit: + case <-c.quit: + c.rescanClientsMtx.Unlock() + return + } + } + c.rescanClientsMtx.Unlock() + default: + log.Warnf("Received unexpected event type from "+ + "rawblock subscription: %v", eventType) + } + } +} + +// txEventHandler reads raw blocks events from the ZMQ block socket and forwards +// them along to the current rescan clients. +// +// NOTE: This must be run as a goroutine. +func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) { + defer c.wg.Done() + defer conn.Close() + + log.Info("Started listening for bitcoind transaction notifications "+ + "via ZMQ on ", c.zmqTxHost) + + for { + // Before attempting to read from the ZMQ socket, we'll make + // sure to check if we've been requested to shut down. + select { + case <-c.quit: + return + default: + } + + // Poll an event from the ZMQ socket. It's possible that the + // connection to the socket continuously times out, so we'll + // prevent logging this error to prevent spamming the logs. + msgBytes, err := conn.Receive() + if err != nil { + err, ok := err.(net.Error) + if !ok || !err.Timeout() { + log.Error(err) + } + + continue + } + + // We have an event! We'll now ensure it is a transaction event, + // deserialize it, and report it to the different rescan + // clients. + eventType := string(msgBytes[0]) + switch eventType { + case "rawtx": + tx := &wire.MsgTx{} + r := bytes.NewReader(msgBytes[1]) + if err := tx.Deserialize(r); err != nil { + log.Errorf("Unable to deserialize "+ + "transaction: %v", err) + continue + } + + c.rescanClientsMtx.Lock() + for _, client := range c.rescanClients { + select { + case client.zmqTxNtfns <- tx: + case <-client.quit: + case <-c.quit: + c.rescanClientsMtx.Unlock() + return + } + } + c.rescanClientsMtx.Unlock() + default: + log.Warnf("Received unexpected event type from rawtx "+ + "subscription: %v", eventType) + } + } +} + +// getCurrentNet returns the network on which the bitcoind node is running. +func (c *BitcoindConn) getCurrentNet() (wire.BitcoinNet, error) { + hash, err := c.client.GetBlockHash(0) + if err != nil { + return 0, err + } + + switch *hash { + case *chaincfg.TestNet3Params.GenesisHash: + return chaincfg.TestNet3Params.Net, nil + case *chaincfg.RegressionNetParams.GenesisHash: + return chaincfg.RegressionNetParams.Net, nil + case *chaincfg.MainNetParams.GenesisHash: + return chaincfg.MainNetParams.Net, nil + default: + return 0, fmt.Errorf("unknown network with genesis hash %v", hash) + } +} + +// NewBitcoindClient returns a bitcoind client using the current bitcoind +// connection. This allows us to share the same connection using multiple +// clients. The birthday signifies the earliest time for which we should begin +// scanning the chain. +func (c *BitcoindConn) NewBitcoindClient(birthday time.Time) *BitcoindClient { + return &BitcoindClient{ + quit: make(chan struct{}), + + id: atomic.AddUint64(&c.rescanClientCounter, 1), + + birthday: birthday, + chainParams: c.chainParams, + chainConn: c, + + rescanUpdate: make(chan interface{}), + watchedAddresses: make(map[string]struct{}), + watchedOutPoints: make(map[wire.OutPoint]struct{}), + watchedTxs: make(map[chainhash.Hash]struct{}), + + notificationQueue: NewConcurrentQueue(20), + zmqTxNtfns: make(chan *wire.MsgTx), + zmqBlockNtfns: make(chan *wire.MsgBlock), + + mempool: make(map[chainhash.Hash]struct{}), + expiredMempool: make(map[int32]map[chainhash.Hash]struct{}), + } +} + +// AddClient adds a client to the set of active rescan clients of the current +// chain connection. This allows the connection to include the specified client +// in its notification delivery. +// +// NOTE: This function is safe for concurrent access. +func (c *BitcoindConn) AddClient(client *BitcoindClient) { + c.rescanClientsMtx.Lock() + defer c.rescanClientsMtx.Unlock() + + c.rescanClients[client.id] = client +} + +// RemoveClient removes the client with the given ID from the set of active +// rescan clients. Once removed, the client will no longer receive block and +// transaction notifications from the chain connection. +// +// NOTE: This function is safe for concurrent access. +func (c *BitcoindConn) RemoveClient(id uint64) { + c.rescanClientsMtx.Lock() + defer c.rescanClientsMtx.Unlock() + + delete(c.rescanClients, id) +} From 8357e86a4d6dbfc4a3635f4de9a2ec8c805139f0 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 17:47:10 -0700 Subject: [PATCH 09/11] chain: match transaction against currently watched transactions In this commit, we extend the client's filtering process to also look at the set of currently watched transactions. The logic to watch for transaction hashes was previously there, but it was not used to filter against incoming transactions. --- chain/bitcoind_client.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index e0cb90a..a136c00 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -1217,6 +1217,14 @@ func (c *BitcoindClient) filterTx(tx *wire.MsgTx, } // If the transaction didn't pay to any of our watched addresses, we'll + // check if we're currently watching for the hash of this transaction. + if !isRelevant { + if _, ok := c.watchedTxs[tx.TxHash()]; ok { + isRelevant = true + } + } + + // If the transaction didn't pay to any of our watched hashes, we'll // check if it spends any of our watched outpoints. if !isRelevant { for _, in := range tx.TxIn { From bbb5a6c058ed84c5ddfac73879b9bedbe110319a Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 21 Jun 2018 17:48:27 -0700 Subject: [PATCH 10/11] wallet: remove the need to set the birthday for bitcoind chain clients Due to the previous commit allowing us to specify the birthday of the wallet at the time of the BitcoindClient's creation, this is now unnecessary. --- wallet/wallet.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/wallet/wallet.go b/wallet/wallet.go index a9948ec..8411494 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -173,8 +173,6 @@ func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) { switch cc := chainClient.(type) { case *chain.NeutrinoClient: cc.SetStartTime(w.Manager.Birthday()) - case *chain.BitcoindClient: - cc.SetStartTime(w.Manager.Birthday()) } w.chainClientLock.Unlock() From dec9978ca21944c6f030112dc4a5810c506e0fd9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 13 Jul 2018 19:09:23 -0700 Subject: [PATCH 11/11] wallet: stop handling chain notifications once wallet has stopped In this commit, we alter the behavior for handling chain notifications within the wallet. The previous code would assume that the channel would close, but due to now using a ConcurrentQueue to handle notifications, this assumption no longer stands. Now, we'll stop handling notifications either once the wallet has or stopped or once the notifications channel has been closed. --- wallet/chainntfns.go | 147 +++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 69 deletions(-) diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index e110cb2..670e136 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -16,10 +16,11 @@ import ( ) func (w *Wallet) handleChainNotifications() { + defer w.wg.Done() + chainClient, err := w.requireChainClient() if err != nil { log.Errorf("handleChainNotifications called without RPC client") - w.wg.Done() return } @@ -84,80 +85,88 @@ func (w *Wallet) handleChainNotifications() { return err } - for n := range chainClient.Notifications() { - var notificationName string - var err error - switch n := n.(type) { - case chain.ClientConnected: - go sync(w) - case chain.BlockConnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.connectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockconnected" - case chain.BlockDisconnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockdisconnected" - case chain.RelevantTx: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.addRelevantTx(tx, n.TxRecord, n.Block) - }) - notificationName = "recvtx/redeemingtx" - case chain.FilteredBlockConnected: - // Atomically update for the whole block. - if len(n.RelevantTxs) > 0 { - err = walletdb.Update(w.db, func( - tx walletdb.ReadWriteTx) error { - var err error - for _, rec := range n.RelevantTxs { - err = w.addRelevantTx(tx, rec, - n.Block) - if err != nil { - return err - } - } - return nil - }) + for { + select { + case n, ok := <-chainClient.Notifications(): + if !ok { + return } - notificationName = "filteredblockconnected" - // The following require some database maintenance, but also - // need to be reported to the wallet's rescan goroutine. - case *chain.RescanProgress: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return + var notificationName string + var err error + switch n := n.(type) { + case chain.ClientConnected: + go sync(w) + case chain.BlockConnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.connectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockconnected" + case chain.BlockDisconnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockdisconnected" + case chain.RelevantTx: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.addRelevantTx(tx, n.TxRecord, n.Block) + }) + notificationName = "recvtx/redeemingtx" + case chain.FilteredBlockConnected: + // Atomically update for the whole block. + if len(n.RelevantTxs) > 0 { + err = walletdb.Update(w.db, func( + tx walletdb.ReadWriteTx) error { + var err error + for _, rec := range n.RelevantTxs { + err = w.addRelevantTx(tx, rec, + n.Block) + if err != nil { + return err + } + } + return nil + }) + } + notificationName = "filteredblockconnected" + + // The following require some database maintenance, but also + // need to be reported to the wallet's rescan goroutine. + case *chain.RescanProgress: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } + case *chain.RescanFinished: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + w.SetChainSynced(true) + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } } - case *chain.RescanFinished: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - w.SetChainSynced(true) - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return - } - } - if err != nil { - // On out-of-sync blockconnected notifications, only - // send a debug message. - errStr := "Failed to process consensus server " + - "notification (name: `%s`, detail: `%v`)" - if notificationName == "blockconnected" && - strings.Contains(err.Error(), - "couldn't get hash from database") { - log.Debugf(errStr, notificationName, err) - } else { - log.Errorf(errStr, notificationName, err) + if err != nil { + // On out-of-sync blockconnected notifications, only + // send a debug message. + errStr := "Failed to process consensus server " + + "notification (name: `%s`, detail: `%v`)" + if notificationName == "blockconnected" && + strings.Contains(err.Error(), + "couldn't get hash from database") { + log.Debugf(errStr, notificationName, err) + } else { + log.Errorf(errStr, notificationName, err) + } } + case <-w.quit: + return } } - w.wg.Done() } // connectBlock handles a chain server notification by marking a wallet