diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index e110cb2..670e136 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -16,10 +16,11 @@ import ( ) func (w *Wallet) handleChainNotifications() { + defer w.wg.Done() + chainClient, err := w.requireChainClient() if err != nil { log.Errorf("handleChainNotifications called without RPC client") - w.wg.Done() return } @@ -84,80 +85,88 @@ func (w *Wallet) handleChainNotifications() { return err } - for n := range chainClient.Notifications() { - var notificationName string - var err error - switch n := n.(type) { - case chain.ClientConnected: - go sync(w) - case chain.BlockConnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.connectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockconnected" - case chain.BlockDisconnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockdisconnected" - case chain.RelevantTx: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.addRelevantTx(tx, n.TxRecord, n.Block) - }) - notificationName = "recvtx/redeemingtx" - case chain.FilteredBlockConnected: - // Atomically update for the whole block. - if len(n.RelevantTxs) > 0 { - err = walletdb.Update(w.db, func( - tx walletdb.ReadWriteTx) error { - var err error - for _, rec := range n.RelevantTxs { - err = w.addRelevantTx(tx, rec, - n.Block) - if err != nil { - return err - } - } - return nil - }) + for { + select { + case n, ok := <-chainClient.Notifications(): + if !ok { + return } - notificationName = "filteredblockconnected" - // The following require some database maintenance, but also - // need to be reported to the wallet's rescan goroutine. - case *chain.RescanProgress: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return + var notificationName string + var err error + switch n := n.(type) { + case chain.ClientConnected: + go sync(w) + case chain.BlockConnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.connectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockconnected" + case chain.BlockDisconnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockdisconnected" + case chain.RelevantTx: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.addRelevantTx(tx, n.TxRecord, n.Block) + }) + notificationName = "recvtx/redeemingtx" + case chain.FilteredBlockConnected: + // Atomically update for the whole block. + if len(n.RelevantTxs) > 0 { + err = walletdb.Update(w.db, func( + tx walletdb.ReadWriteTx) error { + var err error + for _, rec := range n.RelevantTxs { + err = w.addRelevantTx(tx, rec, + n.Block) + if err != nil { + return err + } + } + return nil + }) + } + notificationName = "filteredblockconnected" + + // The following require some database maintenance, but also + // need to be reported to the wallet's rescan goroutine. + case *chain.RescanProgress: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } + case *chain.RescanFinished: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + w.SetChainSynced(true) + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } } - case *chain.RescanFinished: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - w.SetChainSynced(true) - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return - } - } - if err != nil { - // On out-of-sync blockconnected notifications, only - // send a debug message. - errStr := "Failed to process consensus server " + - "notification (name: `%s`, detail: `%v`)" - if notificationName == "blockconnected" && - strings.Contains(err.Error(), - "couldn't get hash from database") { - log.Debugf(errStr, notificationName, err) - } else { - log.Errorf(errStr, notificationName, err) + if err != nil { + // On out-of-sync blockconnected notifications, only + // send a debug message. + errStr := "Failed to process consensus server " + + "notification (name: `%s`, detail: `%v`)" + if notificationName == "blockconnected" && + strings.Contains(err.Error(), + "couldn't get hash from database") { + log.Debugf(errStr, notificationName, err) + } else { + log.Errorf(errStr, notificationName, err) + } } + case <-w.quit: + return } } - w.wg.Done() } // connectBlock handles a chain server notification by marking a wallet