diff --git a/acctmgr.go b/acctmgr.go index c25db0c..82bedb3 100644 --- a/acctmgr.go +++ b/acctmgr.go @@ -949,9 +949,15 @@ func (am *AccountManager) ListUnspent(minconf, maxconf int, } // RescanActiveAddresses begins a rescan for all active addresses for -// each account. -func (am *AccountManager) RescanActiveAddresses() error { +// each account. If markBestBlock is non-nil, the block described by +// the blockstamp is used to mark the synced-with height of the wallet +// just before the rescan is submitted and started. This allows the +// caller to mark the progress that the rescan is expected to complete +// through, if the account otherwise does not contain any recently +// seen blocks. +func (am *AccountManager) RescanActiveAddresses(markBestBlock *wallet.BlockStamp) error { var job *RescanJob + var defaultAcct *Account for _, a := range am.AllAccounts() { acctJob, err := a.RescanActiveJob() if err != nil { @@ -962,8 +968,16 @@ func (am *AccountManager) RescanActiveAddresses() error { } else { job.Merge(acctJob) } + + if a.name == "" { + defaultAcct = a + } } if job != nil { + if markBestBlock != nil { + defaultAcct.SetSyncedWith(markBestBlock) + } + // Submit merged job and block until rescan completes. jobFinished := am.rm.SubmitJob(job) <-jobFinished diff --git a/rescan.go b/rescan.go index c8e44a8..a13e3ab 100644 --- a/rescan.go +++ b/rescan.go @@ -140,10 +140,6 @@ func (b *rescanBatch) merge(job *RescanJob) { } } -type rescanFinished struct { - error -} - // jobHandler runs the RescanManager's for-select loop to manage rescan jobs // and dispatch requests. func (m *RescanManager) jobHandler() { @@ -224,11 +220,13 @@ func (m *RescanManager) rpcHandler() { } client, err := accessClient() if err != nil { - m.status <- rescanFinished{err} + m.MarkFinished(rescanFinished{err}) return } err = client.Rescan(job.StartHeight, addrs, job.OutPoints) - m.status <- rescanFinished{err} + if err != nil { + m.MarkFinished(rescanFinished{err}) + } } } @@ -268,3 +266,9 @@ func (m *RescanManager) SubmitJob(job *RescanJob) <-chan struct{} { func (m *RescanManager) MarkProgress(height rescanProgress) { m.status <- height } + +// MarkFinished messages the RescanManager that the currently running rescan +// finished, or errored prematurely. +func (m *RescanManager) MarkFinished(finished rescanFinished) { + m.status <- finished +} diff --git a/rpcclient.go b/rpcclient.go index 97792fc..cfc7cd8 100644 --- a/rpcclient.go +++ b/rpcclient.go @@ -61,8 +61,6 @@ const ( maxUnhandledNotifications = 50 ) -type notificationChan chan notification - type blockSummary struct { hash *btcwire.ShaHash height int32 @@ -87,9 +85,14 @@ type ( blockDisconnected blockSummary recvTx acceptedTx redeemingTx acceptedTx - rescanProgress int32 + rescanFinished struct { + error + } + rescanProgress int32 ) +type notificationChan chan notification + func (c notificationChan) onBlockConnected(hash *btcwire.ShaHash, height int32) { c <- (blockConnected)(blockSummary{hash, height}) } @@ -106,6 +109,10 @@ func (c notificationChan) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetail c <- redeemingTx{tx, block} } +func (c notificationChan) onRescanFinished(height int32) { + c <- rescanFinished{error: nil} +} + func (c notificationChan) onRescanProgress(height int32) { c <- rescanProgress(height) } @@ -133,7 +140,9 @@ func (n blockConnected) handleNotification() error { wg.Wait() NotifyBalanceSyncerChans.remove <- *n.hash } + AcctMgr.Grab() AcctMgr.BlockNotify(bs) + AcctMgr.Release() // Pass notification to wallet clients too. if server != nil { @@ -158,6 +167,9 @@ func (n blockConnected) MarshalJSON() ([]byte, error) { } func (n blockDisconnected) handleNotification() error { + AcctMgr.Grab() + defer AcctMgr.Release() + // Rollback Utxo and Tx data stores. if err := AcctMgr.Rollback(n.height, n.hash); err != nil { return err @@ -233,6 +245,9 @@ func (n recvTx) handleNotification() error { SendTxHistSyncChans.remove <- *n.tx.Sha() } + AcctMgr.Grab() + defer AcctMgr.Release() + // For every output, find all accounts handling that output address (if any) // and record the received txout. for outIdx, txout := range n.tx.MsgTx().TxOut { @@ -301,7 +316,16 @@ func (n redeemingTx) handleNotification() error { return InvalidNotificationError{err} } n.tx.SetIndex(txIdx) - return AcctMgr.RecordSpendingTx(n.tx, block) + + AcctMgr.Grab() + err = AcctMgr.RecordSpendingTx(n.tx, block) + AcctMgr.Release() + return err +} + +func (n rescanFinished) handleNotification() error { + AcctMgr.rm.MarkFinished(n) + return nil } func (n rescanProgress) handleNotification() error { @@ -339,6 +363,7 @@ func newRPCClient(certs []byte) (*rpcClient, error) { OnBlockDisconnected: ntfns.onBlockDisconnected, OnRecvTx: ntfns.onRecvTx, OnRedeemingTx: ntfns.onRedeemingTx, + OnRescanFinished: ntfns.onRescanFinished, OnRescanProgress: ntfns.onRescanProgress, } conf := btcrpcclient.ConnConfig{ @@ -377,7 +402,6 @@ func (c *rpcClient) WaitForShutdown() { func (c *rpcClient) handleNotifications() { for n := range c.chainNotifications { - AcctMgr.Grab() err := n.handleNotification() if err != nil { switch e := err.(type) { @@ -387,7 +411,6 @@ func (c *rpcClient) handleNotifications() { log.Errorf("Cannot handle notification: %v", e) } } - AcctMgr.Release() } c.wg.Done() } @@ -473,7 +496,7 @@ func (c *rpcClient) Handshake() error { // Begin tracking wallets against this btcd instance. AcctMgr.Track() - if err := AcctMgr.RescanActiveAddresses(); err != nil { + if err := AcctMgr.RescanActiveAddresses(nil); err != nil { return err } // TODO: Only begin tracking new unspent outputs as a result @@ -492,7 +515,7 @@ func (c *rpcClient) Handshake() error { // Iterator was invalid (wallet has never been synced) or there was a // huge chain fork + reorg (more than 20 blocks). AcctMgr.Track() - if err := AcctMgr.RescanActiveAddresses(); err != nil { + if err := AcctMgr.RescanActiveAddresses(&bs); err != nil { return err } // TODO: only begin tracking new unspent outputs as a result of the