wallet: stop handling chain notifications once wallet has stopped

In this commit, we alter the behavior for handling chain notifications
within the wallet. The previous code would assume that the channel would
close, but due to now using a ConcurrentQueue to handle notifications,
this assumption no longer stands. Now, we'll stop handling notifications
either once the wallet has or stopped or once the notifications channel
has been closed.
This commit is contained in:
Wilmer Paulino 2018-07-13 19:09:23 -07:00
parent bbb5a6c058
commit dec9978ca2

View file

@ -16,10 +16,11 @@ import (
) )
func (w *Wallet) handleChainNotifications() { func (w *Wallet) handleChainNotifications() {
defer w.wg.Done()
chainClient, err := w.requireChainClient() chainClient, err := w.requireChainClient()
if err != nil { if err != nil {
log.Errorf("handleChainNotifications called without RPC client") log.Errorf("handleChainNotifications called without RPC client")
w.wg.Done()
return return
} }
@ -84,80 +85,88 @@ func (w *Wallet) handleChainNotifications() {
return err return err
} }
for n := range chainClient.Notifications() { for {
var notificationName string select {
var err error case n, ok := <-chainClient.Notifications():
switch n := n.(type) { if !ok {
case chain.ClientConnected: return
go sync(w)
case chain.BlockConnected:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.connectBlock(tx, wtxmgr.BlockMeta(n))
})
notificationName = "blockconnected"
case chain.BlockDisconnected:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.disconnectBlock(tx, wtxmgr.BlockMeta(n))
})
notificationName = "blockdisconnected"
case chain.RelevantTx:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.addRelevantTx(tx, n.TxRecord, n.Block)
})
notificationName = "recvtx/redeemingtx"
case chain.FilteredBlockConnected:
// Atomically update for the whole block.
if len(n.RelevantTxs) > 0 {
err = walletdb.Update(w.db, func(
tx walletdb.ReadWriteTx) error {
var err error
for _, rec := range n.RelevantTxs {
err = w.addRelevantTx(tx, rec,
n.Block)
if err != nil {
return err
}
}
return nil
})
} }
notificationName = "filteredblockconnected"
// The following require some database maintenance, but also var notificationName string
// need to be reported to the wallet's rescan goroutine. var err error
case *chain.RescanProgress: switch n := n.(type) {
err = catchUpHashes(w, chainClient, n.Height) case chain.ClientConnected:
notificationName = "rescanprogress" go sync(w)
select { case chain.BlockConnected:
case w.rescanNotifications <- n: err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
case <-w.quitChan(): return w.connectBlock(tx, wtxmgr.BlockMeta(n))
return })
notificationName = "blockconnected"
case chain.BlockDisconnected:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.disconnectBlock(tx, wtxmgr.BlockMeta(n))
})
notificationName = "blockdisconnected"
case chain.RelevantTx:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.addRelevantTx(tx, n.TxRecord, n.Block)
})
notificationName = "recvtx/redeemingtx"
case chain.FilteredBlockConnected:
// Atomically update for the whole block.
if len(n.RelevantTxs) > 0 {
err = walletdb.Update(w.db, func(
tx walletdb.ReadWriteTx) error {
var err error
for _, rec := range n.RelevantTxs {
err = w.addRelevantTx(tx, rec,
n.Block)
if err != nil {
return err
}
}
return nil
})
}
notificationName = "filteredblockconnected"
// The following require some database maintenance, but also
// need to be reported to the wallet's rescan goroutine.
case *chain.RescanProgress:
err = catchUpHashes(w, chainClient, n.Height)
notificationName = "rescanprogress"
select {
case w.rescanNotifications <- n:
case <-w.quitChan():
return
}
case *chain.RescanFinished:
err = catchUpHashes(w, chainClient, n.Height)
notificationName = "rescanprogress"
w.SetChainSynced(true)
select {
case w.rescanNotifications <- n:
case <-w.quitChan():
return
}
} }
case *chain.RescanFinished: if err != nil {
err = catchUpHashes(w, chainClient, n.Height) // On out-of-sync blockconnected notifications, only
notificationName = "rescanprogress" // send a debug message.
w.SetChainSynced(true) errStr := "Failed to process consensus server " +
select { "notification (name: `%s`, detail: `%v`)"
case w.rescanNotifications <- n: if notificationName == "blockconnected" &&
case <-w.quitChan(): strings.Contains(err.Error(),
return "couldn't get hash from database") {
} log.Debugf(errStr, notificationName, err)
} } else {
if err != nil { log.Errorf(errStr, notificationName, err)
// On out-of-sync blockconnected notifications, only }
// send a debug message.
errStr := "Failed to process consensus server " +
"notification (name: `%s`, detail: `%v`)"
if notificationName == "blockconnected" &&
strings.Contains(err.Error(),
"couldn't get hash from database") {
log.Debugf(errStr, notificationName, err)
} else {
log.Errorf(errStr, notificationName, err)
} }
case <-w.quit:
return
} }
} }
w.wg.Done()
} }
// connectBlock handles a chain server notification by marking a wallet // connectBlock handles a chain server notification by marking a wallet