From dec9978ca21944c6f030112dc4a5810c506e0fd9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 13 Jul 2018 19:09:23 -0700 Subject: [PATCH] wallet: stop handling chain notifications once wallet has stopped In this commit, we alter the behavior for handling chain notifications within the wallet. The previous code would assume that the channel would close, but due to now using a ConcurrentQueue to handle notifications, this assumption no longer stands. Now, we'll stop handling notifications either once the wallet has or stopped or once the notifications channel has been closed. --- wallet/chainntfns.go | 147 +++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 69 deletions(-) diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index e110cb2..670e136 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -16,10 +16,11 @@ import ( ) func (w *Wallet) handleChainNotifications() { + defer w.wg.Done() + chainClient, err := w.requireChainClient() if err != nil { log.Errorf("handleChainNotifications called without RPC client") - w.wg.Done() return } @@ -84,80 +85,88 @@ func (w *Wallet) handleChainNotifications() { return err } - for n := range chainClient.Notifications() { - var notificationName string - var err error - switch n := n.(type) { - case chain.ClientConnected: - go sync(w) - case chain.BlockConnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.connectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockconnected" - case chain.BlockDisconnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockdisconnected" - case chain.RelevantTx: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.addRelevantTx(tx, n.TxRecord, n.Block) - }) - notificationName = "recvtx/redeemingtx" - case chain.FilteredBlockConnected: - // Atomically update for the whole block. - if len(n.RelevantTxs) > 0 { - err = walletdb.Update(w.db, func( - tx walletdb.ReadWriteTx) error { - var err error - for _, rec := range n.RelevantTxs { - err = w.addRelevantTx(tx, rec, - n.Block) - if err != nil { - return err - } - } - return nil - }) + for { + select { + case n, ok := <-chainClient.Notifications(): + if !ok { + return } - notificationName = "filteredblockconnected" - // The following require some database maintenance, but also - // need to be reported to the wallet's rescan goroutine. - case *chain.RescanProgress: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return + var notificationName string + var err error + switch n := n.(type) { + case chain.ClientConnected: + go sync(w) + case chain.BlockConnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.connectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockconnected" + case chain.BlockDisconnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockdisconnected" + case chain.RelevantTx: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.addRelevantTx(tx, n.TxRecord, n.Block) + }) + notificationName = "recvtx/redeemingtx" + case chain.FilteredBlockConnected: + // Atomically update for the whole block. + if len(n.RelevantTxs) > 0 { + err = walletdb.Update(w.db, func( + tx walletdb.ReadWriteTx) error { + var err error + for _, rec := range n.RelevantTxs { + err = w.addRelevantTx(tx, rec, + n.Block) + if err != nil { + return err + } + } + return nil + }) + } + notificationName = "filteredblockconnected" + + // The following require some database maintenance, but also + // need to be reported to the wallet's rescan goroutine. + case *chain.RescanProgress: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } + case *chain.RescanFinished: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + w.SetChainSynced(true) + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } } - case *chain.RescanFinished: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - w.SetChainSynced(true) - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return - } - } - if err != nil { - // On out-of-sync blockconnected notifications, only - // send a debug message. - errStr := "Failed to process consensus server " + - "notification (name: `%s`, detail: `%v`)" - if notificationName == "blockconnected" && - strings.Contains(err.Error(), - "couldn't get hash from database") { - log.Debugf(errStr, notificationName, err) - } else { - log.Errorf(errStr, notificationName, err) + if err != nil { + // On out-of-sync blockconnected notifications, only + // send a debug message. + errStr := "Failed to process consensus server " + + "notification (name: `%s`, detail: `%v`)" + if notificationName == "blockconnected" && + strings.Contains(err.Error(), + "couldn't get hash from database") { + log.Debugf(errStr, notificationName, err) + } else { + log.Errorf(errStr, notificationName, err) + } } + case <-w.quit: + return } } - w.wg.Done() } // connectBlock handles a chain server notification by marking a wallet