diff --git a/account.go b/account.go index 0ab7344..66fa187 100644 --- a/account.go +++ b/account.go @@ -372,7 +372,6 @@ func (a *Account) ImportPrivateKey(pk []byte, compressed bool, if err != nil { return "", err } - addrStr := addr.EncodeAddress() // Immediately write wallet to disk. AcctMgr.ds.ScheduleWalletWrite(a) @@ -380,41 +379,25 @@ func (a *Account) ImportPrivateKey(pk []byte, compressed bool, return "", fmt.Errorf("cannot write account: %v", err) } + addrStr := addr.EncodeAddress() + // Rescan blockchain for transactions with txout scripts paying to the // imported address. - // - // TODO(jrick): As btcd only allows a single rescan per websocket client - // to run at any given time, a separate goroutine should run for - // exclusively handling rescan events. if rescan { - go func(addr btcutil.Address, aname string) { - addrStr := addr.EncodeAddress() - log.Infof("Beginning rescan (height %d) for address %s", - bs.Height, addrStr) + addrs := []btcutil.Address{addr} + job := &RescanJob{ + Addresses: map[*Account][]btcutil.Address{a: addrs}, + OutPoints: nil, + StartHeight: 0, + } - jsonErr := Rescan(CurrentServerConn(), bs.Height, - []string{addrStr}, nil) - if jsonErr != nil { - log.Errorf("Rescan for imported address %s failed: %v", - addrStr, jsonErr.Message) - return - } - - AcctMgr.Grab() - defer AcctMgr.Release() - a, err := AcctMgr.Account(aname) - if err != nil { - log.Errorf("Account for imported address %s missing: %v", - addrStr, err) - return - } - if err := a.MarkAddressSynced(addr); err != nil { - log.Errorf("Unable to mark rescanned address as synced: %v", err) - return - } - AcctMgr.ds.FlushAccount(a) - log.Infof("Finished rescan for imported address %s", addrStr) - }(addr, a.name) + // Submit rescan job and log when the import has completed. + // Do not block on finishing the rescan. + doneChan := AcctMgr.rm.SubmitJob(job) + go func() { + <-doneChan + log.Infof("Finished import for address %s", addrStr) + }() } // Associate the imported address with this account. @@ -498,13 +481,13 @@ func (a *Account) Track() { } } -// RescanActiveAddresses requests btcd to rescan the blockchain for new -// transactions to all active wallet addresses. This is needed for -// catching btcwallet up to a long-running btcd process, as otherwise -// it would have missed notifications as blocks are attached to the -// main chain. -func (a *Account) RescanActiveAddresses() { - // Determine the block to begin the rescan from. +// RescanActiveJob creates a RescanJob for all active addresses in the +// account. This is needed for catching btcwallet up to a long-running +// btcd process, as otherwise it would have missed notifications as +// blocks are attached to the main chain. +func (a *Account) RescanActiveJob() *RescanJob { + // Determine the block necesary to start the rescan for all active + // addresses. height := int32(0) if a.fullRescan { // Need to perform a complete rescan since the wallet creation @@ -516,25 +499,23 @@ func (a *Account) RescanActiveAddresses() { height = a.SyncHeight() } - log.Infof("Beginning rescan (height %d) for account '%v'", - height, a.name) - - // Rescan active addresses starting at the determined block height. - addrs := a.SortedActiveAddresses() - addrStrs := make([]string, 0, len(addrs)) - for i := range addrs { - addrStrs = append(addrStrs, addrs[i].Address().EncodeAddress()) + actives := a.SortedActiveAddresses() + addrs := make([]btcutil.Address, 0, len(actives)) + for i := range actives { + addrs = append(addrs, actives[i].Address()) } - unspentRecvTxOuts := a.TxStore.UnspentOutputs() - unspentOutPoints := make([]*btcwire.OutPoint, 0, len(unspentRecvTxOuts)) - for _, record := range unspentRecvTxOuts { - unspentOutPoints = append(unspentOutPoints, record.OutPoint()) - } - Rescan(CurrentServerConn(), height, addrStrs, unspentOutPoints) - a.MarkAllSynced() - AcctMgr.ds.FlushAccount(a) - log.Infof("Finished rescan for account '%v'", a.name) + unspents := a.TxStore.UnspentOutputs() + outpoints := make([]*btcwire.OutPoint, 0, len(unspents)) + for i := range unspents { + outpoints = append(outpoints, unspents[i].OutPoint()) + } + + return &RescanJob{ + Addresses: map[*Account][]btcutil.Address{a: addrs}, + OutPoints: outpoints, + StartHeight: height, + } } func (a *Account) ResendUnminedTxs() { diff --git a/acctmgr.go b/acctmgr.go index a2d0d31..b073ac4 100644 --- a/acctmgr.go +++ b/acctmgr.go @@ -49,8 +49,10 @@ type AccountManager struct { accessAll chan *accessAllRequest add chan *Account remove chan *Account + rescanMsgs chan RescanMsg - ds *DiskSyncer // might move to inside Start + ds *DiskSyncer + rm *RescanManager } // NewAccountManager returns a new AccountManager. @@ -62,22 +64,29 @@ func NewAccountManager() *AccountManager { accessAll: make(chan *accessAllRequest), add: make(chan *Account), remove: make(chan *Account), + rescanMsgs: make(chan RescanMsg, 1), } am.ds = NewDiskSyncer(am) + am.rm = NewRescanManager(am.rescanMsgs) return am } -// Start maintains accounts and structures for quick lookups for account -// information. Access to these structures must be done through with the -// channels in the AccountManger struct fields. This function never returns -// and should be called as a new goroutine. +// Start starts the goroutines required to run the AccountManager. func (am *AccountManager) Start() { // Ready the semaphore - can't grab unless the manager has started. am.bsem <- struct{}{} - // Start the account manager's disk syncer. + go am.accountHandler() + go am.rescanListener() go am.ds.Start() + go am.rm.Start() +} +// accountHandler maintains accounts and structures for quick lookups for +// account information. Access to these structures must be done through +// with the channels in the AccountManger struct fields. This function +// never returns and should be called as a new goroutine. +func (am *AccountManager) accountHandler() { // List and map of all accounts. l := list.New() m := make(map[string]*Account) @@ -134,6 +143,50 @@ func (am *AccountManager) Start() { } } +// rescanListener listens for messages from the rescan manager and marks +// accounts and addresses as synced. +func (am *AccountManager) rescanListener() { + for msg := range am.rescanMsgs { + AcctMgr.Grab() + switch e := msg.(type) { + case *RescanStartedMsg: + // Log the newly-started rescan. + n := 0 + for _, addrs := range e.Addresses { + n += len(addrs) + } + noun := pickNoun(n, "address", "addresses") + log.Infof("Started rescan at height %d for %d %s", e.StartHeight, n, noun) + + case *RescanProgressMsg: + // TODO: mark addresses as partially synced. + + case *RescanFinishedMsg: + if e.Error != nil { + log.Errorf("Rescan failed: %v", e.Error.Message) + break + } + + n := 0 + for acct, addrs := range e.Addresses { + for i := range addrs { + n++ + err := acct.MarkAddressSynced(addrs[i]) + if err != nil { + log.Errorf("Error marking address synced: %v", err) + continue + } + } + AcctMgr.ds.FlushAccount(acct) + } + + noun := pickNoun(n, "address", "addresses") + log.Infof("Finished rescan for %d %s", n, noun) + } + AcctMgr.Release() + } +} + // Grab grabs the account manager's binary semaphore. A custom semaphore // is used instead of a sync.Mutex so the account manager's disk syncer // can grab the semaphore from a select statement. @@ -520,18 +573,28 @@ func (am *AccountManager) ListUnspent(minconf, maxconf int, // RescanActiveAddresses begins a rescan for all active addresses for // each account. -// -// TODO(jrick): batch addresses for all accounts together so multiple -// rescan commands can be avoided. func (am *AccountManager) RescanActiveAddresses() { - for _, account := range am.AllAccounts() { - account.RescanActiveAddresses() + var job *RescanJob + for _, a := range am.AllAccounts() { + acctJob := a.RescanActiveJob() + if job == nil { + job = acctJob + } else { + job.Merge(acctJob) + } } + if job == nil { + return + } + + // Submit merged job and block until rescan completes. + jobFinished := am.rm.SubmitJob(job) + <-jobFinished } func (am *AccountManager) ResendUnminedTxs() { - for _, account := range am.AllAccounts() { - account.ResendUnminedTxs() + for _, a := range am.AllAccounts() { + a.ResendUnminedTxs() } } diff --git a/cmd.go b/cmd.go index b6f407d..3300bb7 100644 --- a/cmd.go +++ b/cmd.go @@ -147,7 +147,7 @@ func main() { updateOldFileLocations() // Start account manager and open accounts. - go AcctMgr.Start() + AcctMgr.Start() AcctMgr.OpenAccounts() // Read CA file to verify a btcd TLS connection. diff --git a/disksync.go b/disksync.go index 97e28e7..92edf68 100644 --- a/disksync.go +++ b/disksync.go @@ -207,12 +207,17 @@ func NewDiskSyncer(am *AccountManager) *DiskSyncer { } } -// Start starts the disk syncer. It manages a set of "dirty" account files +// Start starts the goroutines required to run the DiskSyncer. +func (ds *DiskSyncer) Start() { + go ds.handler() +} + +// handler runs the disk syncer. It manages a set of "dirty" account files // which must be written to disk, and synchronizes all writes in a single // goroutine. Periodic flush operations may be signaled by an AccountManager. // // This never returns and is should be called from a new goroutine. -func (ds *DiskSyncer) Start() { +func (ds *DiskSyncer) handler() { netdir := networkDir(cfg.Net()) if err := checkCreateDir(netdir); err != nil { log.Errorf("Unable to create or write to account directory: %v", err) diff --git a/log.go b/log.go index 798bac3..bd93f6f 100644 --- a/log.go +++ b/log.go @@ -97,3 +97,12 @@ func setLogLevel(logLevel string) []seelog.LoggerInterface { return loggers } + +// pickNoun returns the singular or plural form of a noun depending +// on the count n. +func pickNoun(n int, singular, plural string) string { + if n == 1 { + return singular + } + return plural +} diff --git a/rescan.go b/rescan.go new file mode 100644 index 0000000..f5074ec --- /dev/null +++ b/rescan.go @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2013, 2014 Conformal Systems LLC + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +package main + +import ( + "github.com/conformal/btcjson" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" +) + +// RescanMsg is the interface type for messages sent to the +// RescanManager's message channel. +type RescanMsg interface { + ImplementsRescanMsg() +} + +// RescanStartedMsg reports the job being processed for a new +// rescan. +type RescanStartedMsg RescanJob + +// ImplementsRescanMsg is implemented to satisify the RescanMsg +// interface. +func (r *RescanStartedMsg) ImplementsRescanMsg() {} + +// RescanProgressMsg reports the current progress made by a rescan +// for a set of account's addresses. +type RescanProgressMsg struct { + Addresses map[*Account][]btcutil.Address + Height int32 +} + +// ImplementsRescanMsg is implemented to satisify the RescanMsg +// interface. +func (r *RescanProgressMsg) ImplementsRescanMsg() {} + +// RescanFinishedMsg reports the set of account's addresses of a +// possibly-finished rescan, or an error if the rescan failed. +type RescanFinishedMsg struct { + Addresses map[*Account][]btcutil.Address + Error *btcjson.Error +} + +// ImplementsRescanMsg is implemented to satisify the RescanMsg +// interface. +func (r *RescanFinishedMsg) ImplementsRescanMsg() {} + +// RescanManager manages a set of current and to be processed account's +// addresses, batching waiting jobs together to minimize the total time +// needed to rescan many separate jobs. Rescan requests are processed +// one at a time, and the next batch does not run until the current +// has finished. +type RescanManager struct { + addJob chan *RescanJob + sendJob chan *RescanJob + status chan interface{} // rescanProgress and rescanFinished + msgs chan RescanMsg + jobCompleteChan chan chan struct{} +} + +// NewRescanManager creates a new RescanManger. If msgChan is non-nil, +// rescan messages are sent to the channel for additional processing by +// the caller. +func NewRescanManager(msgChan chan RescanMsg) *RescanManager { + return &RescanManager{ + addJob: make(chan *RescanJob, 1), + sendJob: make(chan *RescanJob, 1), + status: make(chan interface{}, 1), + msgs: msgChan, + jobCompleteChan: make(chan chan struct{}, 1), + } +} + +// Start starts the goroutines to run the RescanManager. +func (m *RescanManager) Start() { + go m.jobHandler() + go m.rpcHandler() +} + +type rescanBatch struct { + addrs map[*Account][]btcutil.Address + outpoints map[btcwire.OutPoint]struct{} + height int32 + complete chan struct{} +} + +func newRescanBatch() *rescanBatch { + return &rescanBatch{ + addrs: map[*Account][]btcutil.Address{}, + outpoints: map[btcwire.OutPoint]struct{}{}, + height: -1, + complete: make(chan struct{}), + } +} + +func (b *rescanBatch) done() { + close(b.complete) +} + +func (b *rescanBatch) empty() bool { + return len(b.addrs) == 0 +} + +func (b *rescanBatch) job() *RescanJob { + // Create slice of outpoint points from the batch's set. + outpoints := make([]*btcwire.OutPoint, 0, len(b.outpoints)) + for outpoint := range b.outpoints { + opCopy := outpoint + outpoints = append(outpoints, &opCopy) + } + + return &RescanJob{ + Addresses: b.addrs, + OutPoints: outpoints, + StartHeight: b.height, + } +} + +func (b *rescanBatch) merge(job *RescanJob) { + for acct, addr := range job.Addresses { + b.addrs[acct] = append(b.addrs[acct], addr...) + } + for _, op := range job.OutPoints { + b.outpoints[*op] = struct{}{} + } + if b.height == -1 || job.StartHeight < b.height { + b.height = job.StartHeight + } +} + +// Status types for the handler. +type rescanProgress int32 +type rescanFinished *btcjson.Error + +// jobHandler runs the RescanManager's for-select loop to manage rescan jobs +// and dispatch requests. +func (m *RescanManager) jobHandler() { + curBatch := newRescanBatch() + nextBatch := newRescanBatch() + + for { + select { + case job := <-m.addJob: + if curBatch.empty() { + // Set current batch as this job and send + // request. + curBatch.merge(job) + m.sendJob <- job + + // Send the channel that is closed when the + // current batch completes. + m.jobCompleteChan <- curBatch.complete + + // Notify listener of a newly-started rescan. + if m.msgs != nil { + m.msgs <- (*RescanStartedMsg)(job) + } + } else { + // Add job to waiting batch. + nextBatch.merge(job) + + // Send the channel that is closed when the + // waiting batch completes. + m.jobCompleteChan <- nextBatch.complete + } + + case status := <-m.status: + switch s := status.(type) { + case rescanProgress: + if m.msgs != nil { + m.msgs <- &RescanProgressMsg{ + Addresses: curBatch.addrs, + Height: int32(s), + } + } + + case rescanFinished: + if m.msgs != nil { + m.msgs <- &RescanFinishedMsg{ + Addresses: curBatch.addrs, + Error: (*btcjson.Error)(s), + } + } + curBatch.done() + + curBatch, nextBatch = nextBatch, newRescanBatch() + + if !curBatch.empty() { + job := curBatch.job() + m.sendJob <- curBatch.job() + if m.msgs != nil { + m.msgs <- (*RescanStartedMsg)(job) + } + } + } + } + } +} + +// rpcHandler reads jobs sent by the jobHandler and sends the rpc requests +// to perform the rescan. New jobs are not read until a rescan finishes. +// The jobHandler is notified when the processing the rescan finishes. +func (m *RescanManager) rpcHandler() { + for job := range m.sendJob { + var addrStrs []string + for _, addrs := range job.Addresses { + for i := range addrs { + addrStrs = append(addrStrs, addrs[i].EncodeAddress()) + } + } + + c := CurrentServerConn() + jsonErr := Rescan(c, job.StartHeight, addrStrs, job.OutPoints) + m.status <- rescanFinished(jsonErr) + } +} + +// RescanJob is a job to be processed by the RescanManager. The job includes +// a set of account's addresses, a starting height to begin the rescan, and +// outpoints spendable by the addresses thought to be unspent. +type RescanJob struct { + Addresses map[*Account][]btcutil.Address + OutPoints []*btcwire.OutPoint + StartHeight int32 +} + +// Merge merges the work from k into j, setting the starting height to +// the minimum of the two jobs. This method does not check for +// duplicate addresses or outpoints. +func (j *RescanJob) Merge(k *RescanJob) { + for acct, addrs := range k.Addresses { + j.Addresses[acct] = append(j.Addresses[acct], addrs...) + } + for _, op := range k.OutPoints { + j.OutPoints = append(j.OutPoints, op) + } + if k.StartHeight < j.StartHeight { + j.StartHeight = k.StartHeight + } +} + +// SubmitJob submits a RescanJob to the RescanManager. A channel is returned +// that is closed once the rescan request for the job completes. +func (m *RescanManager) SubmitJob(job *RescanJob) <-chan struct{} { + m.addJob <- job + return <-m.jobCompleteChan +} + +// MarkProgress messages the RescanManager with the height of the block +// last processed by a running rescan. +func (m *RescanManager) MarkProgress(height int32) { + m.status <- rescanProgress(height) +}