wallet: make wallet initial sync synchronous

This ensures the wallet can properly do an initial sync, a recovery, or
detect if it's on a stale branch before attempting to process new blocks
at tip.

Since the rescan will be triggered synchronously as well, we'll need to
catch the wallet's quit chan when handling rescan batches in order to
allow for clean shutdowns.
This commit is contained in:
Wilmer Paulino 2019-05-14 13:16:58 -07:00
parent f2f46b674d
commit 1ee2a239de
No known key found for this signature in database
GPG key ID: 6DF57B9F9514972F
3 changed files with 62 additions and 32 deletions

View file

@ -35,17 +35,6 @@ func (w *Wallet) handleChainNotifications() {
return return
} }
sync := func(w *Wallet, birthdayStamp *waddrmgr.BlockStamp) {
// At the moment there is no recourse if the rescan fails for
// some reason, however, the wallet will not be marked synced
// and many methods will error early since the wallet is known
// to be out of date.
err := w.syncWithChain(birthdayStamp)
if err != nil && !w.ShuttingDown() {
log.Warnf("Unable to synchronize wallet to chain: %v", err)
}
}
catchUpHashes := func(w *Wallet, client chain.Interface, catchUpHashes := func(w *Wallet, client chain.Interface,
height int32) error { height int32) error {
// TODO(aakselrod): There's a race conditon here, which // TODO(aakselrod): There's a race conditon here, which
@ -119,14 +108,16 @@ func (w *Wallet) handleChainNotifications() {
chainClient, birthdayStore, chainClient, birthdayStore,
) )
if err != nil && !waddrmgr.IsError(err, waddrmgr.ErrBirthdayBlockNotSet) { if err != nil && !waddrmgr.IsError(err, waddrmgr.ErrBirthdayBlockNotSet) {
err := fmt.Errorf("unable to sanity "+ panic(fmt.Errorf("Unable to sanity "+
"check wallet birthday block: %v", "check wallet birthday block: %v",
err) err))
log.Error(err)
panic(err)
} }
go sync(w, birthdayBlock) err = w.syncWithChain(birthdayBlock)
if err != nil && !w.ShuttingDown() {
panic(fmt.Errorf("Unable to synchronize "+
"wallet to chain: %v", err))
}
case chain.BlockConnected: case chain.BlockConnected:
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
return w.connectBlock(tx, wtxmgr.BlockMeta(n)) return w.connectBlock(tx, wtxmgr.BlockMeta(n))

View file

@ -56,7 +56,11 @@ type rescanBatch struct {
func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error { func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error {
errChan := make(chan error, 1) errChan := make(chan error, 1)
job.err = errChan job.err = errChan
w.rescanAddJob <- job select {
case w.rescanAddJob <- job:
case <-w.quitChan():
errChan <- ErrWalletShuttingDown
}
return errChan return errChan
} }
@ -103,10 +107,11 @@ func (b *rescanBatch) done(err error) {
// submissions, and possibly batching many waiting requests together so they // submissions, and possibly batching many waiting requests together so they
// can be handled by a single rescan after the current one completes. // can be handled by a single rescan after the current one completes.
func (w *Wallet) rescanBatchHandler() { func (w *Wallet) rescanBatchHandler() {
defer w.wg.Done()
var curBatch, nextBatch *rescanBatch var curBatch, nextBatch *rescanBatch
quit := w.quitChan() quit := w.quitChan()
out:
for { for {
select { select {
case job := <-w.rescanAddJob: case job := <-w.rescanAddJob:
@ -114,7 +119,12 @@ out:
// Set current batch as this job and send // Set current batch as this job and send
// request. // request.
curBatch = job.batch() curBatch = job.batch()
w.rescanBatch <- curBatch select {
case w.rescanBatch <- curBatch:
case <-quit:
job.err <- ErrWalletShuttingDown
return
}
} else { } else {
// Create next batch if it doesn't exist, or // Create next batch if it doesn't exist, or
// merge the job. // merge the job.
@ -134,9 +144,16 @@ out:
"currently running") "currently running")
continue continue
} }
w.rescanProgress <- &RescanProgressMsg{ select {
case w.rescanProgress <- &RescanProgressMsg{
Addresses: curBatch.addrs, Addresses: curBatch.addrs,
Notification: n, Notification: n,
}:
case <-quit:
for _, errChan := range curBatch.errChans {
errChan <- ErrWalletShuttingDown
}
return
} }
case *chain.RescanFinished: case *chain.RescanFinished:
@ -146,15 +163,29 @@ out:
"currently running") "currently running")
continue continue
} }
w.rescanFinished <- &RescanFinishedMsg{ select {
case w.rescanFinished <- &RescanFinishedMsg{
Addresses: curBatch.addrs, Addresses: curBatch.addrs,
Notification: n, Notification: n,
}:
case <-quit:
for _, errChan := range curBatch.errChans {
errChan <- ErrWalletShuttingDown
}
return
} }
curBatch, nextBatch = nextBatch, nil curBatch, nextBatch = nextBatch, nil
if curBatch != nil { if curBatch != nil {
w.rescanBatch <- curBatch select {
case w.rescanBatch <- curBatch:
case <-quit:
for _, errChan := range curBatch.errChans {
errChan <- ErrWalletShuttingDown
}
return
}
} }
default: default:
@ -163,11 +194,9 @@ out:
} }
case <-quit: case <-quit:
break out return
} }
} }
w.wg.Done()
} }
// rescanProgressHandler handles notifications for partially and fully completed // rescanProgressHandler handles notifications for partially and fully completed
@ -280,5 +309,10 @@ func (w *Wallet) rescanWithTarget(addrs []btcutil.Address,
} }
// Submit merged job and block until rescan completes. // Submit merged job and block until rescan completes.
return <-w.SubmitRescan(job) select {
case err := <-w.SubmitRescan(job):
return err
case <-w.quitChan():
return ErrWalletShuttingDown
}
} }

View file

@ -54,13 +54,18 @@ const (
recoveryBatchSize = 2000 recoveryBatchSize = 2000
) )
// ErrNotSynced describes an error where an operation cannot complete
// due wallet being out of sync (and perhaps currently syncing with)
// the remote chain server.
var ErrNotSynced = errors.New("wallet is not synchronized with the chain server")
// Namespace bucket keys.
var ( var (
// ErrNotSynced describes an error where an operation cannot complete
// due wallet being out of sync (and perhaps currently syncing with)
// the remote chain server.
ErrNotSynced = errors.New("wallet is not synchronized with the chain server")
// ErrWalletShuttingDown is an error returned when we attempt to make a
// request to the wallet but it is in the process of or has already shut
// down.
ErrWalletShuttingDown = errors.New("wallet shutting down")
// Namespace bucket keys.
waddrmgrNamespaceKey = []byte("waddrmgr") waddrmgrNamespaceKey = []byte("waddrmgr")
wtxmgrNamespaceKey = []byte("wtxmgr") wtxmgrNamespaceKey = []byte("wtxmgr")
) )