b145868a4b
Closes #69.
298 lines
8 KiB
Go
298 lines
8 KiB
Go
/*
|
|
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
|
*
|
|
* Permission to use, copy, modify, and distribute this software for any
|
|
* purpose with or without fee is hereby granted, provided that the above
|
|
* copyright notice and this permission notice appear in all copies.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/conformal/btcutil"
|
|
"github.com/conformal/btcwire"
|
|
)
|
|
|
|
// RescanMsg is the interface type for messages sent to the
|
|
// RescanManager's message channel.
|
|
type RescanMsg interface {
|
|
ImplementsRescanMsg()
|
|
}
|
|
|
|
// RescanStartedMsg reports the job being processed for a new
|
|
// rescan.
|
|
type RescanStartedMsg RescanJob
|
|
|
|
// ImplementsRescanMsg is implemented to satisify the RescanMsg
|
|
// interface.
|
|
func (r *RescanStartedMsg) ImplementsRescanMsg() {}
|
|
|
|
// RescanProgressMsg reports the current progress made by a rescan
|
|
// for a set of account's addresses.
|
|
type RescanProgressMsg struct {
|
|
Addresses map[*Account][]btcutil.Address
|
|
Height int32
|
|
}
|
|
|
|
// ImplementsRescanMsg is implemented to satisify the RescanMsg
|
|
// interface.
|
|
func (r *RescanProgressMsg) ImplementsRescanMsg() {}
|
|
|
|
// RescanFinishedMsg reports the set of account's addresses of a
|
|
// possibly-finished rescan, or an error if the rescan failed.
|
|
type RescanFinishedMsg struct {
|
|
Addresses map[*Account][]btcutil.Address
|
|
Error error
|
|
}
|
|
|
|
// ImplementsRescanMsg is implemented to satisify the RescanMsg
|
|
// interface.
|
|
func (r *RescanFinishedMsg) ImplementsRescanMsg() {}
|
|
|
|
// RescanManager manages a set of current and to be processed account's
|
|
// addresses, batching waiting jobs together to minimize the total time
|
|
// needed to rescan many separate jobs. Rescan requests are processed
|
|
// one at a time, and the next batch does not run until the current
|
|
// has finished.
|
|
type RescanManager struct {
|
|
addJob chan *RescanJob
|
|
sendJob chan *RescanJob
|
|
status chan interface{} // rescanProgress and rescanFinished
|
|
msgs chan RescanMsg
|
|
jobCompleteChan chan chan struct{}
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
}
|
|
|
|
// NewRescanManager creates a new RescanManger. If msgChan is non-nil,
|
|
// rescan messages are sent to the channel for additional processing by
|
|
// the caller.
|
|
func NewRescanManager(msgChan chan RescanMsg) *RescanManager {
|
|
return &RescanManager{
|
|
addJob: make(chan *RescanJob, 1),
|
|
sendJob: make(chan *RescanJob, 1),
|
|
status: make(chan interface{}, 1),
|
|
msgs: msgChan,
|
|
jobCompleteChan: make(chan chan struct{}, 1),
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start starts the goroutines to run the RescanManager.
|
|
func (m *RescanManager) Start() {
|
|
m.wg.Add(2)
|
|
go m.jobHandler()
|
|
go m.rpcHandler()
|
|
}
|
|
|
|
func (m *RescanManager) Stop() {
|
|
close(m.quit)
|
|
}
|
|
|
|
func (m *RescanManager) WaitForShutdown() {
|
|
m.wg.Wait()
|
|
}
|
|
|
|
type rescanBatch struct {
|
|
addrs map[*Account][]btcutil.Address
|
|
outpoints map[btcwire.OutPoint]struct{}
|
|
height int32
|
|
complete chan struct{}
|
|
}
|
|
|
|
func newRescanBatch() *rescanBatch {
|
|
return &rescanBatch{
|
|
addrs: map[*Account][]btcutil.Address{},
|
|
outpoints: map[btcwire.OutPoint]struct{}{},
|
|
height: -1,
|
|
complete: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (b *rescanBatch) done() {
|
|
close(b.complete)
|
|
}
|
|
|
|
func (b *rescanBatch) empty() bool {
|
|
return len(b.addrs) == 0
|
|
}
|
|
|
|
func (b *rescanBatch) job() *RescanJob {
|
|
// Create slice of outpoints from the batch's set.
|
|
outpoints := make([]*btcwire.OutPoint, 0, len(b.outpoints))
|
|
for outpoint := range b.outpoints {
|
|
opCopy := outpoint
|
|
outpoints = append(outpoints, &opCopy)
|
|
}
|
|
|
|
return &RescanJob{
|
|
Addresses: b.addrs,
|
|
OutPoints: outpoints,
|
|
StartHeight: b.height,
|
|
}
|
|
}
|
|
|
|
func (b *rescanBatch) merge(job *RescanJob) {
|
|
for acct, addr := range job.Addresses {
|
|
b.addrs[acct] = append(b.addrs[acct], addr...)
|
|
}
|
|
for _, op := range job.OutPoints {
|
|
b.outpoints[*op] = struct{}{}
|
|
}
|
|
if b.height == -1 || job.StartHeight < b.height {
|
|
b.height = job.StartHeight
|
|
}
|
|
}
|
|
|
|
// jobHandler runs the RescanManager's for-select loop to manage rescan jobs
|
|
// and dispatch requests.
|
|
func (m *RescanManager) jobHandler() {
|
|
curBatch := newRescanBatch()
|
|
nextBatch := newRescanBatch()
|
|
|
|
out:
|
|
for {
|
|
select {
|
|
case job := <-m.addJob:
|
|
if curBatch.empty() {
|
|
// Set current batch as this job and send
|
|
// request.
|
|
curBatch.merge(job)
|
|
m.sendJob <- job
|
|
|
|
// Send the channel that is closed when the
|
|
// current batch completes.
|
|
m.jobCompleteChan <- curBatch.complete
|
|
|
|
// Notify listener of a newly-started rescan.
|
|
if m.msgs != nil {
|
|
m.msgs <- (*RescanStartedMsg)(job)
|
|
}
|
|
} else {
|
|
// Add job to waiting batch.
|
|
nextBatch.merge(job)
|
|
|
|
// Send the channel that is closed when the
|
|
// waiting batch completes.
|
|
m.jobCompleteChan <- nextBatch.complete
|
|
}
|
|
|
|
case status := <-m.status:
|
|
switch s := status.(type) {
|
|
case rescanProgress:
|
|
if m.msgs != nil {
|
|
m.msgs <- &RescanProgressMsg{
|
|
Addresses: curBatch.addrs,
|
|
Height: int32(s),
|
|
}
|
|
}
|
|
|
|
case rescanFinished:
|
|
if m.msgs != nil {
|
|
m.msgs <- &RescanFinishedMsg{
|
|
Addresses: curBatch.addrs,
|
|
Error: s.error,
|
|
}
|
|
}
|
|
curBatch.done()
|
|
|
|
curBatch, nextBatch = nextBatch, newRescanBatch()
|
|
|
|
if !curBatch.empty() {
|
|
job := curBatch.job()
|
|
m.sendJob <- job
|
|
if m.msgs != nil {
|
|
m.msgs <- (*RescanStartedMsg)(job)
|
|
}
|
|
}
|
|
|
|
default:
|
|
// Unexpected status message
|
|
panic(s)
|
|
}
|
|
|
|
case <-m.quit:
|
|
break out
|
|
}
|
|
}
|
|
close(m.sendJob)
|
|
if m.msgs != nil {
|
|
close(m.msgs)
|
|
}
|
|
m.wg.Done()
|
|
}
|
|
|
|
// rpcHandler reads jobs sent by the jobHandler and sends the rpc requests
|
|
// to perform the rescan. New jobs are not read until a rescan finishes.
|
|
// The jobHandler is notified when the processing the rescan finishes.
|
|
func (m *RescanManager) rpcHandler() {
|
|
for job := range m.sendJob {
|
|
var addrs []btcutil.Address
|
|
for _, accountAddrs := range job.Addresses {
|
|
addrs = append(addrs, accountAddrs...)
|
|
}
|
|
client, err := accessClient()
|
|
if err != nil {
|
|
m.MarkFinished(rescanFinished{err})
|
|
return
|
|
}
|
|
err = client.Rescan(job.StartHeight, addrs, job.OutPoints)
|
|
if err != nil {
|
|
m.MarkFinished(rescanFinished{err})
|
|
}
|
|
}
|
|
m.wg.Done()
|
|
}
|
|
|
|
// RescanJob is a job to be processed by the RescanManager. The job includes
|
|
// a set of account's addresses, a starting height to begin the rescan, and
|
|
// outpoints spendable by the addresses thought to be unspent.
|
|
type RescanJob struct {
|
|
Addresses map[*Account][]btcutil.Address
|
|
OutPoints []*btcwire.OutPoint
|
|
StartHeight int32
|
|
}
|
|
|
|
// Merge merges the work from k into j, setting the starting height to
|
|
// the minimum of the two jobs. This method does not check for
|
|
// duplicate addresses or outpoints.
|
|
func (j *RescanJob) Merge(k *RescanJob) {
|
|
for acct, addrs := range k.Addresses {
|
|
j.Addresses[acct] = append(j.Addresses[acct], addrs...)
|
|
}
|
|
for _, op := range k.OutPoints {
|
|
j.OutPoints = append(j.OutPoints, op)
|
|
}
|
|
if k.StartHeight < j.StartHeight {
|
|
j.StartHeight = k.StartHeight
|
|
}
|
|
}
|
|
|
|
// SubmitJob submits a RescanJob to the RescanManager. A channel is returned
|
|
// that is closed once the rescan request for the job completes.
|
|
func (m *RescanManager) SubmitJob(job *RescanJob) <-chan struct{} {
|
|
m.addJob <- job
|
|
return <-m.jobCompleteChan
|
|
}
|
|
|
|
// MarkProgress messages the RescanManager with the height of the block
|
|
// last processed by a running rescan.
|
|
func (m *RescanManager) MarkProgress(height rescanProgress) {
|
|
m.status <- height
|
|
}
|
|
|
|
// MarkFinished messages the RescanManager that the currently running rescan
|
|
// finished, or errored prematurely.
|
|
func (m *RescanManager) MarkFinished(finished rescanFinished) {
|
|
m.status <- finished
|
|
}
|