lbcwallet/wallet/rescan.go
Josh Rickmar 497ffc11f0 Modernize the RPC server.
This is a rather monolithic commit that moves the old RPC server to
its own package (rpc/legacyrpc), introduces a new RPC server using
gRPC (rpc/rpcserver), and provides the ability to defer wallet loading
until request at a later time by an RPC (--noinitialload).

The legacy RPC server remains the default for now while the new gRPC
server is not enabled by default.  Enabling the new server requires
setting a listen address (--experimenalrpclisten).  This experimental
flag is used to effectively feature gate the server until it is ready
to use as a default.  Both RPC servers can be run at the same time,
but require binding to different listen addresses.

In theory, with the legacy RPC server now living in its own package it
should become much easier to unit test the handlers.  This will be
useful for any future changes to the package, as compatibility with
Core's wallet is still desired.

Type safety has also been improved in the legacy RPC server.  Multiple
handler types are now used for methods that do and do not require the
RPC client as a dependency.  This can statically help prevent nil
pointer dereferences, and was very useful for catching bugs during
refactoring.

To synchronize the wallet loading process between the main package
(the default) and through the gRPC WalletLoader service (with the
--noinitialload option), as well as increasing the loose coupling of
packages, a new wallet.Loader type has been added.  All creating and
loading of existing wallets is done through a single Loader instance,
and callbacks can be attached to the instance to run after the wallet
has been opened.  This is how the legacy RPC server is associated with
a loaded wallet, even after the wallet is loaded by a gRPC method in a
completely unrelated package.

Documentation for the new RPC server has been added to the
rpc/documentation directory.  The documentation includes a
specification for the new RPC API, addresses how to make changes to
the server implementation, and provides short example clients in
several different languages.

Some of the new RPC methods are not implementated exactly as described
by the specification.  These are considered bugs with the
implementation, not the spec.  Known bugs are commented as such.
2016-01-29 11:18:26 -05:00

294 lines
8.2 KiB
Go

/*
* Copyright (c) 2013, 2014 The btcsuite developers
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
package wallet
import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
)
// RescanProgressMsg reports the current progress made by a rescan for a
// set of wallet addresses.
type RescanProgressMsg struct {
Addresses []btcutil.Address
Notification *chain.RescanProgress
}
// RescanFinishedMsg reports the addresses that were rescanned when a
// rescanfinished message was received rescanning a batch of addresses.
type RescanFinishedMsg struct {
Addresses []btcutil.Address
Notification *chain.RescanFinished
}
// RescanJob is a job to be processed by the RescanManager. The job includes
// a set of wallet addresses, a starting height to begin the rescan, and
// outpoints spendable by the addresses thought to be unspent. After the
// rescan completes, the error result of the rescan RPC is sent on the Err
// channel.
type RescanJob struct {
InitialSync bool
Addrs []btcutil.Address
OutPoints []*wire.OutPoint
BlockStamp waddrmgr.BlockStamp
err chan error
}
// rescanBatch is a collection of one or more RescanJobs that were merged
// together before a rescan is performed.
type rescanBatch struct {
initialSync bool
addrs []btcutil.Address
outpoints []*wire.OutPoint
bs waddrmgr.BlockStamp
errChans []chan error
}
// SubmitRescan submits a RescanJob to the RescanManager. A channel is
// returned with the final error of the rescan. The channel is buffered
// and does not need to be read to prevent a deadlock.
func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error {
errChan := make(chan error, 1)
job.err = errChan
w.rescanAddJob <- job
return errChan
}
// batch creates the rescanBatch for a single rescan job.
func (job *RescanJob) batch() *rescanBatch {
return &rescanBatch{
initialSync: job.InitialSync,
addrs: job.Addrs,
outpoints: job.OutPoints,
bs: job.BlockStamp,
errChans: []chan error{job.err},
}
}
// merge merges the work from k into j, setting the starting height to
// the minimum of the two jobs. This method does not check for
// duplicate addresses or outpoints.
func (b *rescanBatch) merge(job *RescanJob) {
if job.InitialSync {
b.initialSync = true
}
b.addrs = append(b.addrs, job.Addrs...)
b.outpoints = append(b.outpoints, job.OutPoints...)
if job.BlockStamp.Height < b.bs.Height {
b.bs = job.BlockStamp
}
b.errChans = append(b.errChans, job.err)
}
// done iterates through all error channels, duplicating sending the error
// to inform callers that the rescan finished (or could not complete due
// to an error).
func (b *rescanBatch) done(err error) {
for _, c := range b.errChans {
c <- err
}
}
// rescanBatchHandler handles incoming rescan request, serializing rescan
// submissions, and possibly batching many waiting requests together so they
// can be handled by a single rescan after the current one completes.
func (w *Wallet) rescanBatchHandler() {
var curBatch, nextBatch *rescanBatch
quit := w.quitChan()
out:
for {
select {
case job := <-w.rescanAddJob:
if curBatch == nil {
// Set current batch as this job and send
// request.
curBatch = job.batch()
w.rescanBatch <- curBatch
} else {
// Create next batch if it doesn't exist, or
// merge the job.
if nextBatch == nil {
nextBatch = job.batch()
} else {
nextBatch.merge(job)
}
}
case n := <-w.rescanNotifications:
switch n := n.(type) {
case *chain.RescanProgress:
w.rescanProgress <- &RescanProgressMsg{
Addresses: curBatch.addrs,
Notification: n,
}
case *chain.RescanFinished:
if curBatch == nil {
log.Warnf("Received rescan finished " +
"notification but no rescan " +
"currently running")
continue
}
w.rescanFinished <- &RescanFinishedMsg{
Addresses: curBatch.addrs,
Notification: n,
}
curBatch, nextBatch = nextBatch, nil
if curBatch != nil {
w.rescanBatch <- curBatch
}
default:
// Unexpected message
panic(n)
}
case <-quit:
break out
}
}
w.wg.Done()
}
// rescanProgressHandler handles notifications for partially and fully completed
// rescans by marking each rescanned address as partially or fully synced.
func (w *Wallet) rescanProgressHandler() {
quit := w.quitChan()
out:
for {
// These can't be processed out of order since both chans are
// unbuffured and are sent from same context (the batch
// handler).
select {
case msg := <-w.rescanProgress:
n := msg.Notification
log.Infof("Rescanned through block %v (height %d)",
n.Hash, n.Height)
bs := waddrmgr.BlockStamp{
Hash: *n.Hash,
Height: n.Height,
}
if err := w.Manager.SetSyncedTo(&bs); err != nil {
log.Errorf("Failed to update address manager "+
"sync state for hash %v (height %d): %v",
n.Hash, n.Height, err)
}
case msg := <-w.rescanFinished:
n := msg.Notification
addrs := msg.Addresses
noun := pickNoun(len(addrs), "address", "addresses")
log.Infof("Finished rescan for %d %s (synced to block "+
"%s, height %d)", len(addrs), noun, n.Hash,
n.Height)
bs := waddrmgr.BlockStamp{n.Height, *n.Hash}
if err := w.Manager.SetSyncedTo(&bs); err != nil {
log.Errorf("Failed to update address manager "+
"sync state for hash %v (height %d): %v",
n.Hash, n.Height, err)
}
w.SetChainSynced(true)
go w.ResendUnminedTxs()
// TODO(jrick): The current websocket API requires
// notifying the block the rescan synced through to
// every connected client. This is code smell and
// should be removed or replaced with a more
// appropiate notification when the API is redone.
b := wtxmgr.BlockMeta{
Block: wtxmgr.Block{
*n.Hash,
n.Height,
},
Time: n.Time,
}
w.notifyConnectedBlock(b)
case <-quit:
break out
}
}
w.wg.Done()
}
// rescanRPCHandler reads batch jobs sent by rescanBatchHandler and sends the
// RPC requests to perform a rescan. New jobs are not read until a rescan
// finishes.
func (w *Wallet) rescanRPCHandler() {
chainClient, err := w.requireChainClient()
if err != nil {
log.Errorf("rescanRPCHandler called without an RPC client")
w.wg.Done()
return
}
quit := w.quitChan()
out:
for {
select {
case batch := <-w.rescanBatch:
// Log the newly-started rescan.
numAddrs := len(batch.addrs)
noun := pickNoun(numAddrs, "address", "addresses")
log.Infof("Started rescan from block %v (height %d) for %d %s",
batch.bs.Hash, batch.bs.Height, numAddrs, noun)
err := chainClient.Rescan(&batch.bs.Hash, batch.addrs,
batch.outpoints)
if err != nil {
log.Errorf("Rescan for %d %s failed: %v", numAddrs,
noun, err)
}
batch.done(err)
case <-quit:
break out
}
}
w.wg.Done()
}
// Rescan begins a rescan for all active addresses and unspent outputs of
// a wallet. This is intended to be used to sync a wallet back up to the
// current best block in the main chain, and is considered an initial sync
// rescan.
func (w *Wallet) Rescan(addrs []btcutil.Address, unspent []wtxmgr.Credit) error {
outpoints := make([]*wire.OutPoint, len(unspent))
for i, output := range unspent {
outpoints[i] = &output.OutPoint
}
job := &RescanJob{
InitialSync: true,
Addrs: addrs,
OutPoints: outpoints,
BlockStamp: w.Manager.SyncedTo(),
}
// Submit merged job and block until rescan completes.
return <-w.SubmitRescan(job)
}