lbcwallet/rescan.go
Josh Rickmar b9fd527d33 Remove account support, fix races on btcd connect.
This commit is the result of several big changes being made to the
wallet.  In particular, the "handshake" (initial sync to the chain
server) was quite racy and required proper synchronization.  To make
fixing this race easier, several other changes were made to the
internal wallet data structures and much of the RPC server ended up
being rewritten.

First, all account support has been removed.  The previous Account
struct has been replaced with a Wallet structure, which includes a
keystore for saving keys, and a txstore for storing relevant
transactions.  This decision has been made since it is the opinion of
myself and other developers that bitcoind accounts are fundamentally
broken (as accounts implemented by bitcoind support both arbitrary
address groupings as well as moving balances between accounts -- these
are fundamentally incompatible features), and since a BIP0032 keystore
is soon planned to be implemented (at which point, "accounts" can
return as HD extended keys).  With the keystore handling the grouping
of related keys, there is no reason have many different Account
structs, and the AccountManager has been removed as well.  All RPC
handlers that take an account option will only work with "" (the
default account) or "*" if the RPC allows specifying all accounts.

Second, much of the RPC server has been cleaned up.  The global
variables for the RPC server and chain server client have been moved
to part of the rpcServer struct, and the handlers for each RPC method
that are looked up change depending on which components have been set.
Passthrough requests are also no longer handled specially, but when
the chain server is set, a handler to perform the passthrough will be
returned if the method is not otherwise a wallet RPC.  The
notification system for websocket clients has also been rewritten so
wallet components can send notifications through channels, rather than
requiring direct access to the RPC server itself, or worse still,
sending directly to a websocket client's send channel.  In the future,
this will enable proper registration of notifications, rather than
unsolicited broadcasts to every connected websocket client (see
issue #84).

Finally, and the main reason why much of this cleanup was necessary,
the races during intial sync with the chain server have been fixed.
Previously, when the 'Handshake' was run, a rescan would occur which
would perform modifications to Account data structures as
notifications were received.  Synchronization was provided with a
single binary semaphore which serialized all access to wallet and
account data.  However, the Handshake itself was not able to run with
this lock (or else notifications would block), and many data races
would occur as both notifications were being handled.  If GOMAXPROCS
was ever increased beyond 1, btcwallet would always immediately crash
due to invalid addresses caused by the data races on startup.  To fix
this, the single lock for all wallet access has been replaced with
mutexes for both the keystore and txstore.  Handling of btcd
notifications and client requests may now occur simultaneously.
GOMAXPROCS has also been set to the number of logical CPUs at the
beginning of main, since with the data races fixed, there's no reason
to prevent the extra parallelism gained by increasing it.

Closes #78.

Closes #101.

Closes #110.
2014-07-25 13:26:14 -05:00

319 lines
8.9 KiB
Go

/*
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
package main
import (
"github.com/conformal/btcutil"
"github.com/conformal/btcwallet/chain"
"github.com/conformal/btcwallet/keystore"
"github.com/conformal/btcwire"
)
// RescanProgressMsg reports the current progress made by a rescan for a
// set of wallet addresses.
type RescanProgressMsg struct {
Addresses []btcutil.Address
Notification *chain.RescanProgress
}
// RescanFinishedMsg reports the addresses that were rescanned when a
// rescanfinished message was received rescanning a batch of addresses.
type RescanFinishedMsg struct {
Addresses []btcutil.Address
Notification *chain.RescanFinished
WasInitialSync bool
}
// RescanJob is a job to be processed by the RescanManager. The job includes
// a set of wallet addresses, a starting height to begin the rescan, and
// outpoints spendable by the addresses thought to be unspent. After the
// rescan completes, the error result of the rescan RPC is sent on the Err
// channel.
type RescanJob struct {
InitialSync bool
Addrs []btcutil.Address
OutPoints []*btcwire.OutPoint
BlockStamp keystore.BlockStamp
err chan error
}
// rescanBatch is a collection of one or more RescanJobs that were merged
// together before a rescan is performed.
type rescanBatch struct {
initialSync bool
addrs []btcutil.Address
outpoints []*btcwire.OutPoint
bs keystore.BlockStamp
errChans []chan error
}
// SubmitRescan submits a RescanJob to the RescanManager. A channel is
// returned with the final error of the rescan. The channel is buffered
// and does not need to be read to prevent a deadlock.
func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error {
errChan := make(chan error, 1)
job.err = errChan
w.rescanAddJob <- job
return errChan
}
// batch creates the rescanBatch for a single rescan job.
func (job *RescanJob) batch() *rescanBatch {
return &rescanBatch{
initialSync: job.InitialSync,
addrs: job.Addrs,
outpoints: job.OutPoints,
bs: job.BlockStamp,
errChans: []chan error{job.err},
}
}
// merge merges the work from k into j, setting the starting height to
// the minimum of the two jobs. This method does not check for
// duplicate addresses or outpoints.
func (b *rescanBatch) merge(job *RescanJob) {
if job.InitialSync {
b.initialSync = true
}
b.addrs = append(b.addrs, job.Addrs...)
b.outpoints = append(b.outpoints, job.OutPoints...)
if job.BlockStamp.Height < b.bs.Height {
b.bs = job.BlockStamp
}
b.errChans = append(b.errChans, job.err)
}
// done iterates through all error channels, duplicating sending the error
// to inform callers that the rescan finished (or could not complete due
// to an error).
func (b *rescanBatch) done(err error) {
for _, c := range b.errChans {
c <- err
}
}
// rescanBatchHandler handles incoming rescan request, serializing rescan
// submissions, and possibly batching many waiting requests together so they
// can be handled by a single rescan after the current one completes.
func (w *Wallet) rescanBatchHandler() {
var curBatch, nextBatch *rescanBatch
out:
for {
select {
case job := <-w.rescanAddJob:
if curBatch == nil {
// Set current batch as this job and send
// request.
curBatch = job.batch()
w.rescanBatch <- curBatch
} else {
// Create next batch if it doesn't exist, or
// merge the job.
if nextBatch == nil {
nextBatch = job.batch()
} else {
nextBatch.merge(job)
}
}
case n := <-w.rescanNotifications:
switch n := n.(type) {
case *chain.RescanProgress:
w.rescanProgress <- &RescanProgressMsg{
Addresses: curBatch.addrs,
Notification: n,
}
case *chain.RescanFinished:
if curBatch == nil {
log.Warnf("Received rescan finished " +
"notification but no rescan " +
"currently running")
continue
}
w.rescanFinished <- &RescanFinishedMsg{
Addresses: curBatch.addrs,
Notification: n,
WasInitialSync: curBatch.initialSync,
}
curBatch, nextBatch = nextBatch, nil
if curBatch != nil {
w.rescanBatch <- curBatch
}
default:
// Unexpected message
panic(n)
}
case <-w.quit:
break out
}
}
close(w.rescanBatch)
w.wg.Done()
}
// rescanProgressHandler handles notifications for paritally and fully completed
// rescans by marking each rescanned address as partially or fully synced and
// writing the keystore back to disk.
func (w *Wallet) rescanProgressHandler() {
out:
for {
// These can't be processed out of order since both chans are
// unbuffured and are sent from same context (the batch
// handler).
select {
case msg := <-w.rescanProgress:
n := msg.Notification
log.Infof("Rescanned through block %v (height %d)",
n.Hash, n.Height)
// TODO(jrick): save partial syncs should also include
// the block hash.
for _, addr := range msg.Addresses {
err := w.KeyStore.SetSyncStatus(addr,
keystore.PartialSync(n.Height))
if err != nil {
log.Errorf("Error marking address %v "+
"partially synced: %v", addr, err)
}
}
w.KeyStore.MarkDirty()
err := w.KeyStore.WriteIfDirty()
if err != nil {
log.Errorf("Could not write partial rescan "+
"progress to keystore: %v", err)
}
case msg := <-w.rescanFinished:
n := msg.Notification
addrs := msg.Addresses
noun := pickNoun(len(addrs), "address", "addresses")
if msg.WasInitialSync {
w.Track()
w.ResendUnminedTxs()
bs := keystore.BlockStamp{
Hash: n.Hash,
Height: n.Height,
}
w.KeyStore.SetSyncedWith(&bs)
w.notifyConnectedBlock(bs)
// Mark wallet as synced to chain so connected
// and disconnected block notifications are
// processed.
close(w.chainSynced)
}
log.Infof("Finished rescan for %d %s (synced to block "+
"%s, height %d)", len(addrs), noun, n.Hash,
n.Height)
for _, addr := range addrs {
err := w.KeyStore.SetSyncStatus(addr,
keystore.FullSync{})
if err != nil {
log.Errorf("Error marking address %v "+
"fully synced: %v", addr, err)
}
}
w.KeyStore.MarkDirty()
err := w.KeyStore.WriteIfDirty()
if err != nil {
log.Errorf("Could not write finished rescan "+
"progress to keystore: %v", err)
}
case <-w.quit:
break out
}
}
w.wg.Done()
}
// rescanRPCHandler reads batch jobs sent by rescanBatchHandler and sends the
// RPC requests to perform a rescan. New jobs are not read until a rescan
// finishes.
func (w *Wallet) rescanRPCHandler() {
for batch := range w.rescanBatch {
// Log the newly-started rescan.
numAddrs := len(batch.addrs)
noun := pickNoun(numAddrs, "address", "addresses")
log.Infof("Started rescan from block %v (height %d) for %d %s",
batch.bs.Hash, batch.bs.Height, numAddrs, noun)
err := w.chainSvr.Rescan(batch.bs.Hash, batch.addrs,
batch.outpoints)
if err != nil {
log.Errorf("Rescan for %d %s failed: %v", numAddrs,
noun, err)
}
batch.done(err)
}
w.wg.Done()
}
// RescanActiveAddresses begins a rescan for all active addresses of a
// wallet. This is intended to be used to sync a wallet back up to the
// current best block in the main chain, and is considered an intial sync
// rescan.
func (w *Wallet) RescanActiveAddresses() (err error) {
// Determine the block necesary to start the rescan for all active
// addresses.
hash, height := w.KeyStore.SyncedTo()
if hash == nil {
// TODO: fix our "synced to block" handling (either in
// keystore or txstore, or elsewhere) so this *always*
// returns the block hash. Looking it up by height is
// asking for problems.
hash, err = w.chainSvr.GetBlockHash(int64(height))
if err != nil {
return
}
}
actives := w.KeyStore.SortedActiveAddresses()
addrs := make([]btcutil.Address, len(actives))
for i, addr := range actives {
addrs[i] = addr.Address()
}
unspents, err := w.TxStore.UnspentOutputs()
if err != nil {
return
}
outpoints := make([]*btcwire.OutPoint, len(unspents))
for i, output := range unspents {
outpoints[i] = output.OutPoint()
}
job := &RescanJob{
InitialSync: true,
Addrs: addrs,
OutPoints: outpoints,
BlockStamp: keystore.BlockStamp{Hash: hash, Height: height},
}
// Submit merged job and block until rescan completes.
return <-w.SubmitRescan(job)
}