b9fd527d33
This commit is the result of several big changes being made to the wallet. In particular, the "handshake" (initial sync to the chain server) was quite racy and required proper synchronization. To make fixing this race easier, several other changes were made to the internal wallet data structures and much of the RPC server ended up being rewritten. First, all account support has been removed. The previous Account struct has been replaced with a Wallet structure, which includes a keystore for saving keys, and a txstore for storing relevant transactions. This decision has been made since it is the opinion of myself and other developers that bitcoind accounts are fundamentally broken (as accounts implemented by bitcoind support both arbitrary address groupings as well as moving balances between accounts -- these are fundamentally incompatible features), and since a BIP0032 keystore is soon planned to be implemented (at which point, "accounts" can return as HD extended keys). With the keystore handling the grouping of related keys, there is no reason have many different Account structs, and the AccountManager has been removed as well. All RPC handlers that take an account option will only work with "" (the default account) or "*" if the RPC allows specifying all accounts. Second, much of the RPC server has been cleaned up. The global variables for the RPC server and chain server client have been moved to part of the rpcServer struct, and the handlers for each RPC method that are looked up change depending on which components have been set. Passthrough requests are also no longer handled specially, but when the chain server is set, a handler to perform the passthrough will be returned if the method is not otherwise a wallet RPC. The notification system for websocket clients has also been rewritten so wallet components can send notifications through channels, rather than requiring direct access to the RPC server itself, or worse still, sending directly to a websocket client's send channel. In the future, this will enable proper registration of notifications, rather than unsolicited broadcasts to every connected websocket client (see issue #84). Finally, and the main reason why much of this cleanup was necessary, the races during intial sync with the chain server have been fixed. Previously, when the 'Handshake' was run, a rescan would occur which would perform modifications to Account data structures as notifications were received. Synchronization was provided with a single binary semaphore which serialized all access to wallet and account data. However, the Handshake itself was not able to run with this lock (or else notifications would block), and many data races would occur as both notifications were being handled. If GOMAXPROCS was ever increased beyond 1, btcwallet would always immediately crash due to invalid addresses caused by the data races on startup. To fix this, the single lock for all wallet access has been replaced with mutexes for both the keystore and txstore. Handling of btcd notifications and client requests may now occur simultaneously. GOMAXPROCS has also been set to the number of logical CPUs at the beginning of main, since with the data races fixed, there's no reason to prevent the extra parallelism gained by increasing it. Closes #78. Closes #101. Closes #110.
318 lines
8.9 KiB
Go
318 lines
8.9 KiB
Go
/*
|
|
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
|
*
|
|
* Permission to use, copy, modify, and distribute this software for any
|
|
* purpose with or without fee is hereby granted, provided that the above
|
|
* copyright notice and this permission notice appear in all copies.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"github.com/conformal/btcutil"
|
|
"github.com/conformal/btcwallet/chain"
|
|
"github.com/conformal/btcwallet/keystore"
|
|
"github.com/conformal/btcwire"
|
|
)
|
|
|
|
// RescanProgressMsg reports the current progress made by a rescan for a
|
|
// set of wallet addresses.
|
|
type RescanProgressMsg struct {
|
|
Addresses []btcutil.Address
|
|
Notification *chain.RescanProgress
|
|
}
|
|
|
|
// RescanFinishedMsg reports the addresses that were rescanned when a
|
|
// rescanfinished message was received rescanning a batch of addresses.
|
|
type RescanFinishedMsg struct {
|
|
Addresses []btcutil.Address
|
|
Notification *chain.RescanFinished
|
|
WasInitialSync bool
|
|
}
|
|
|
|
// RescanJob is a job to be processed by the RescanManager. The job includes
|
|
// a set of wallet addresses, a starting height to begin the rescan, and
|
|
// outpoints spendable by the addresses thought to be unspent. After the
|
|
// rescan completes, the error result of the rescan RPC is sent on the Err
|
|
// channel.
|
|
type RescanJob struct {
|
|
InitialSync bool
|
|
Addrs []btcutil.Address
|
|
OutPoints []*btcwire.OutPoint
|
|
BlockStamp keystore.BlockStamp
|
|
err chan error
|
|
}
|
|
|
|
// rescanBatch is a collection of one or more RescanJobs that were merged
|
|
// together before a rescan is performed.
|
|
type rescanBatch struct {
|
|
initialSync bool
|
|
addrs []btcutil.Address
|
|
outpoints []*btcwire.OutPoint
|
|
bs keystore.BlockStamp
|
|
errChans []chan error
|
|
}
|
|
|
|
// SubmitRescan submits a RescanJob to the RescanManager. A channel is
|
|
// returned with the final error of the rescan. The channel is buffered
|
|
// and does not need to be read to prevent a deadlock.
|
|
func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error {
|
|
errChan := make(chan error, 1)
|
|
job.err = errChan
|
|
w.rescanAddJob <- job
|
|
return errChan
|
|
}
|
|
|
|
// batch creates the rescanBatch for a single rescan job.
|
|
func (job *RescanJob) batch() *rescanBatch {
|
|
return &rescanBatch{
|
|
initialSync: job.InitialSync,
|
|
addrs: job.Addrs,
|
|
outpoints: job.OutPoints,
|
|
bs: job.BlockStamp,
|
|
errChans: []chan error{job.err},
|
|
}
|
|
}
|
|
|
|
// merge merges the work from k into j, setting the starting height to
|
|
// the minimum of the two jobs. This method does not check for
|
|
// duplicate addresses or outpoints.
|
|
func (b *rescanBatch) merge(job *RescanJob) {
|
|
if job.InitialSync {
|
|
b.initialSync = true
|
|
}
|
|
b.addrs = append(b.addrs, job.Addrs...)
|
|
b.outpoints = append(b.outpoints, job.OutPoints...)
|
|
if job.BlockStamp.Height < b.bs.Height {
|
|
b.bs = job.BlockStamp
|
|
}
|
|
b.errChans = append(b.errChans, job.err)
|
|
}
|
|
|
|
// done iterates through all error channels, duplicating sending the error
|
|
// to inform callers that the rescan finished (or could not complete due
|
|
// to an error).
|
|
func (b *rescanBatch) done(err error) {
|
|
for _, c := range b.errChans {
|
|
c <- err
|
|
}
|
|
}
|
|
|
|
// rescanBatchHandler handles incoming rescan request, serializing rescan
|
|
// submissions, and possibly batching many waiting requests together so they
|
|
// can be handled by a single rescan after the current one completes.
|
|
func (w *Wallet) rescanBatchHandler() {
|
|
var curBatch, nextBatch *rescanBatch
|
|
|
|
out:
|
|
for {
|
|
select {
|
|
case job := <-w.rescanAddJob:
|
|
if curBatch == nil {
|
|
// Set current batch as this job and send
|
|
// request.
|
|
curBatch = job.batch()
|
|
w.rescanBatch <- curBatch
|
|
} else {
|
|
// Create next batch if it doesn't exist, or
|
|
// merge the job.
|
|
if nextBatch == nil {
|
|
nextBatch = job.batch()
|
|
} else {
|
|
nextBatch.merge(job)
|
|
}
|
|
}
|
|
|
|
case n := <-w.rescanNotifications:
|
|
switch n := n.(type) {
|
|
case *chain.RescanProgress:
|
|
w.rescanProgress <- &RescanProgressMsg{
|
|
Addresses: curBatch.addrs,
|
|
Notification: n,
|
|
}
|
|
|
|
case *chain.RescanFinished:
|
|
if curBatch == nil {
|
|
log.Warnf("Received rescan finished " +
|
|
"notification but no rescan " +
|
|
"currently running")
|
|
continue
|
|
}
|
|
w.rescanFinished <- &RescanFinishedMsg{
|
|
Addresses: curBatch.addrs,
|
|
Notification: n,
|
|
WasInitialSync: curBatch.initialSync,
|
|
}
|
|
|
|
curBatch, nextBatch = nextBatch, nil
|
|
|
|
if curBatch != nil {
|
|
w.rescanBatch <- curBatch
|
|
}
|
|
|
|
default:
|
|
// Unexpected message
|
|
panic(n)
|
|
}
|
|
|
|
case <-w.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
close(w.rescanBatch)
|
|
w.wg.Done()
|
|
}
|
|
|
|
// rescanProgressHandler handles notifications for paritally and fully completed
|
|
// rescans by marking each rescanned address as partially or fully synced and
|
|
// writing the keystore back to disk.
|
|
func (w *Wallet) rescanProgressHandler() {
|
|
out:
|
|
for {
|
|
// These can't be processed out of order since both chans are
|
|
// unbuffured and are sent from same context (the batch
|
|
// handler).
|
|
select {
|
|
case msg := <-w.rescanProgress:
|
|
n := msg.Notification
|
|
log.Infof("Rescanned through block %v (height %d)",
|
|
n.Hash, n.Height)
|
|
|
|
// TODO(jrick): save partial syncs should also include
|
|
// the block hash.
|
|
for _, addr := range msg.Addresses {
|
|
err := w.KeyStore.SetSyncStatus(addr,
|
|
keystore.PartialSync(n.Height))
|
|
if err != nil {
|
|
log.Errorf("Error marking address %v "+
|
|
"partially synced: %v", addr, err)
|
|
}
|
|
}
|
|
w.KeyStore.MarkDirty()
|
|
err := w.KeyStore.WriteIfDirty()
|
|
if err != nil {
|
|
log.Errorf("Could not write partial rescan "+
|
|
"progress to keystore: %v", err)
|
|
}
|
|
|
|
case msg := <-w.rescanFinished:
|
|
n := msg.Notification
|
|
addrs := msg.Addresses
|
|
noun := pickNoun(len(addrs), "address", "addresses")
|
|
if msg.WasInitialSync {
|
|
w.Track()
|
|
w.ResendUnminedTxs()
|
|
|
|
bs := keystore.BlockStamp{
|
|
Hash: n.Hash,
|
|
Height: n.Height,
|
|
}
|
|
w.KeyStore.SetSyncedWith(&bs)
|
|
w.notifyConnectedBlock(bs)
|
|
|
|
// Mark wallet as synced to chain so connected
|
|
// and disconnected block notifications are
|
|
// processed.
|
|
close(w.chainSynced)
|
|
}
|
|
log.Infof("Finished rescan for %d %s (synced to block "+
|
|
"%s, height %d)", len(addrs), noun, n.Hash,
|
|
n.Height)
|
|
|
|
for _, addr := range addrs {
|
|
err := w.KeyStore.SetSyncStatus(addr,
|
|
keystore.FullSync{})
|
|
if err != nil {
|
|
log.Errorf("Error marking address %v "+
|
|
"fully synced: %v", addr, err)
|
|
}
|
|
}
|
|
w.KeyStore.MarkDirty()
|
|
err := w.KeyStore.WriteIfDirty()
|
|
if err != nil {
|
|
log.Errorf("Could not write finished rescan "+
|
|
"progress to keystore: %v", err)
|
|
}
|
|
|
|
case <-w.quit:
|
|
break out
|
|
}
|
|
}
|
|
w.wg.Done()
|
|
}
|
|
|
|
// rescanRPCHandler reads batch jobs sent by rescanBatchHandler and sends the
|
|
// RPC requests to perform a rescan. New jobs are not read until a rescan
|
|
// finishes.
|
|
func (w *Wallet) rescanRPCHandler() {
|
|
for batch := range w.rescanBatch {
|
|
// Log the newly-started rescan.
|
|
numAddrs := len(batch.addrs)
|
|
noun := pickNoun(numAddrs, "address", "addresses")
|
|
log.Infof("Started rescan from block %v (height %d) for %d %s",
|
|
batch.bs.Hash, batch.bs.Height, numAddrs, noun)
|
|
|
|
err := w.chainSvr.Rescan(batch.bs.Hash, batch.addrs,
|
|
batch.outpoints)
|
|
if err != nil {
|
|
log.Errorf("Rescan for %d %s failed: %v", numAddrs,
|
|
noun, err)
|
|
}
|
|
batch.done(err)
|
|
}
|
|
w.wg.Done()
|
|
}
|
|
|
|
// RescanActiveAddresses begins a rescan for all active addresses of a
|
|
// wallet. This is intended to be used to sync a wallet back up to the
|
|
// current best block in the main chain, and is considered an intial sync
|
|
// rescan.
|
|
func (w *Wallet) RescanActiveAddresses() (err error) {
|
|
// Determine the block necesary to start the rescan for all active
|
|
// addresses.
|
|
hash, height := w.KeyStore.SyncedTo()
|
|
if hash == nil {
|
|
// TODO: fix our "synced to block" handling (either in
|
|
// keystore or txstore, or elsewhere) so this *always*
|
|
// returns the block hash. Looking it up by height is
|
|
// asking for problems.
|
|
hash, err = w.chainSvr.GetBlockHash(int64(height))
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
actives := w.KeyStore.SortedActiveAddresses()
|
|
addrs := make([]btcutil.Address, len(actives))
|
|
for i, addr := range actives {
|
|
addrs[i] = addr.Address()
|
|
}
|
|
|
|
unspents, err := w.TxStore.UnspentOutputs()
|
|
if err != nil {
|
|
return
|
|
}
|
|
outpoints := make([]*btcwire.OutPoint, len(unspents))
|
|
for i, output := range unspents {
|
|
outpoints[i] = output.OutPoint()
|
|
}
|
|
|
|
job := &RescanJob{
|
|
InitialSync: true,
|
|
Addrs: addrs,
|
|
OutPoints: outpoints,
|
|
BlockStamp: keystore.BlockStamp{Hash: hash, Height: height},
|
|
}
|
|
|
|
// Submit merged job and block until rescan completes.
|
|
return <-w.SubmitRescan(job)
|
|
}
|