Implement a batching rescan manager.
Recent btcd versions only allow one rescan to run at any given time per websocket client. To better handle this, a new set of goroutines are started by the account manager which batch and serialize rescan jobs. If no rescans are currently running, a new rescan starts. If a rescan is already being processed, the request is queued and runs after the current rescan finishes. For any additional incoming requests before the current rescan finishes, the requests are merged with the currently-waiting request so both can be handled with a single rescan. This change also prepares for rescan progress notifications from btcd, but are still unhandled until the necessary details for partially-synced addresses are added to the wallet file format.
This commit is contained in:
parent
e22d221ea8
commit
59845d9c21
6 changed files with 396 additions and 72 deletions
93
account.go
93
account.go
|
@ -372,7 +372,6 @@ func (a *Account) ImportPrivateKey(pk []byte, compressed bool,
|
|||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
addrStr := addr.EncodeAddress()
|
||||
|
||||
// Immediately write wallet to disk.
|
||||
AcctMgr.ds.ScheduleWalletWrite(a)
|
||||
|
@ -380,41 +379,25 @@ func (a *Account) ImportPrivateKey(pk []byte, compressed bool,
|
|||
return "", fmt.Errorf("cannot write account: %v", err)
|
||||
}
|
||||
|
||||
addrStr := addr.EncodeAddress()
|
||||
|
||||
// Rescan blockchain for transactions with txout scripts paying to the
|
||||
// imported address.
|
||||
//
|
||||
// TODO(jrick): As btcd only allows a single rescan per websocket client
|
||||
// to run at any given time, a separate goroutine should run for
|
||||
// exclusively handling rescan events.
|
||||
if rescan {
|
||||
go func(addr btcutil.Address, aname string) {
|
||||
addrStr := addr.EncodeAddress()
|
||||
log.Infof("Beginning rescan (height %d) for address %s",
|
||||
bs.Height, addrStr)
|
||||
addrs := []btcutil.Address{addr}
|
||||
job := &RescanJob{
|
||||
Addresses: map[*Account][]btcutil.Address{a: addrs},
|
||||
OutPoints: nil,
|
||||
StartHeight: 0,
|
||||
}
|
||||
|
||||
jsonErr := Rescan(CurrentServerConn(), bs.Height,
|
||||
[]string{addrStr}, nil)
|
||||
if jsonErr != nil {
|
||||
log.Errorf("Rescan for imported address %s failed: %v",
|
||||
addrStr, jsonErr.Message)
|
||||
return
|
||||
}
|
||||
|
||||
AcctMgr.Grab()
|
||||
defer AcctMgr.Release()
|
||||
a, err := AcctMgr.Account(aname)
|
||||
if err != nil {
|
||||
log.Errorf("Account for imported address %s missing: %v",
|
||||
addrStr, err)
|
||||
return
|
||||
}
|
||||
if err := a.MarkAddressSynced(addr); err != nil {
|
||||
log.Errorf("Unable to mark rescanned address as synced: %v", err)
|
||||
return
|
||||
}
|
||||
AcctMgr.ds.FlushAccount(a)
|
||||
log.Infof("Finished rescan for imported address %s", addrStr)
|
||||
}(addr, a.name)
|
||||
// Submit rescan job and log when the import has completed.
|
||||
// Do not block on finishing the rescan.
|
||||
doneChan := AcctMgr.rm.SubmitJob(job)
|
||||
go func() {
|
||||
<-doneChan
|
||||
log.Infof("Finished import for address %s", addrStr)
|
||||
}()
|
||||
}
|
||||
|
||||
// Associate the imported address with this account.
|
||||
|
@ -498,13 +481,13 @@ func (a *Account) Track() {
|
|||
}
|
||||
}
|
||||
|
||||
// RescanActiveAddresses requests btcd to rescan the blockchain for new
|
||||
// transactions to all active wallet addresses. This is needed for
|
||||
// catching btcwallet up to a long-running btcd process, as otherwise
|
||||
// it would have missed notifications as blocks are attached to the
|
||||
// main chain.
|
||||
func (a *Account) RescanActiveAddresses() {
|
||||
// Determine the block to begin the rescan from.
|
||||
// RescanActiveJob creates a RescanJob for all active addresses in the
|
||||
// account. This is needed for catching btcwallet up to a long-running
|
||||
// btcd process, as otherwise it would have missed notifications as
|
||||
// blocks are attached to the main chain.
|
||||
func (a *Account) RescanActiveJob() *RescanJob {
|
||||
// Determine the block necesary to start the rescan for all active
|
||||
// addresses.
|
||||
height := int32(0)
|
||||
if a.fullRescan {
|
||||
// Need to perform a complete rescan since the wallet creation
|
||||
|
@ -516,25 +499,23 @@ func (a *Account) RescanActiveAddresses() {
|
|||
height = a.SyncHeight()
|
||||
}
|
||||
|
||||
log.Infof("Beginning rescan (height %d) for account '%v'",
|
||||
height, a.name)
|
||||
|
||||
// Rescan active addresses starting at the determined block height.
|
||||
addrs := a.SortedActiveAddresses()
|
||||
addrStrs := make([]string, 0, len(addrs))
|
||||
for i := range addrs {
|
||||
addrStrs = append(addrStrs, addrs[i].Address().EncodeAddress())
|
||||
actives := a.SortedActiveAddresses()
|
||||
addrs := make([]btcutil.Address, 0, len(actives))
|
||||
for i := range actives {
|
||||
addrs = append(addrs, actives[i].Address())
|
||||
}
|
||||
unspentRecvTxOuts := a.TxStore.UnspentOutputs()
|
||||
unspentOutPoints := make([]*btcwire.OutPoint, 0, len(unspentRecvTxOuts))
|
||||
for _, record := range unspentRecvTxOuts {
|
||||
unspentOutPoints = append(unspentOutPoints, record.OutPoint())
|
||||
}
|
||||
Rescan(CurrentServerConn(), height, addrStrs, unspentOutPoints)
|
||||
a.MarkAllSynced()
|
||||
AcctMgr.ds.FlushAccount(a)
|
||||
|
||||
log.Infof("Finished rescan for account '%v'", a.name)
|
||||
unspents := a.TxStore.UnspentOutputs()
|
||||
outpoints := make([]*btcwire.OutPoint, 0, len(unspents))
|
||||
for i := range unspents {
|
||||
outpoints = append(outpoints, unspents[i].OutPoint())
|
||||
}
|
||||
|
||||
return &RescanJob{
|
||||
Addresses: map[*Account][]btcutil.Address{a: addrs},
|
||||
OutPoints: outpoints,
|
||||
StartHeight: height,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Account) ResendUnminedTxs() {
|
||||
|
|
89
acctmgr.go
89
acctmgr.go
|
@ -49,8 +49,10 @@ type AccountManager struct {
|
|||
accessAll chan *accessAllRequest
|
||||
add chan *Account
|
||||
remove chan *Account
|
||||
rescanMsgs chan RescanMsg
|
||||
|
||||
ds *DiskSyncer // might move to inside Start
|
||||
ds *DiskSyncer
|
||||
rm *RescanManager
|
||||
}
|
||||
|
||||
// NewAccountManager returns a new AccountManager.
|
||||
|
@ -62,22 +64,29 @@ func NewAccountManager() *AccountManager {
|
|||
accessAll: make(chan *accessAllRequest),
|
||||
add: make(chan *Account),
|
||||
remove: make(chan *Account),
|
||||
rescanMsgs: make(chan RescanMsg, 1),
|
||||
}
|
||||
am.ds = NewDiskSyncer(am)
|
||||
am.rm = NewRescanManager(am.rescanMsgs)
|
||||
return am
|
||||
}
|
||||
|
||||
// Start maintains accounts and structures for quick lookups for account
|
||||
// information. Access to these structures must be done through with the
|
||||
// channels in the AccountManger struct fields. This function never returns
|
||||
// and should be called as a new goroutine.
|
||||
// Start starts the goroutines required to run the AccountManager.
|
||||
func (am *AccountManager) Start() {
|
||||
// Ready the semaphore - can't grab unless the manager has started.
|
||||
am.bsem <- struct{}{}
|
||||
|
||||
// Start the account manager's disk syncer.
|
||||
go am.accountHandler()
|
||||
go am.rescanListener()
|
||||
go am.ds.Start()
|
||||
go am.rm.Start()
|
||||
}
|
||||
|
||||
// accountHandler maintains accounts and structures for quick lookups for
|
||||
// account information. Access to these structures must be done through
|
||||
// with the channels in the AccountManger struct fields. This function
|
||||
// never returns and should be called as a new goroutine.
|
||||
func (am *AccountManager) accountHandler() {
|
||||
// List and map of all accounts.
|
||||
l := list.New()
|
||||
m := make(map[string]*Account)
|
||||
|
@ -134,6 +143,50 @@ func (am *AccountManager) Start() {
|
|||
}
|
||||
}
|
||||
|
||||
// rescanListener listens for messages from the rescan manager and marks
|
||||
// accounts and addresses as synced.
|
||||
func (am *AccountManager) rescanListener() {
|
||||
for msg := range am.rescanMsgs {
|
||||
AcctMgr.Grab()
|
||||
switch e := msg.(type) {
|
||||
case *RescanStartedMsg:
|
||||
// Log the newly-started rescan.
|
||||
n := 0
|
||||
for _, addrs := range e.Addresses {
|
||||
n += len(addrs)
|
||||
}
|
||||
noun := pickNoun(n, "address", "addresses")
|
||||
log.Infof("Started rescan at height %d for %d %s", e.StartHeight, n, noun)
|
||||
|
||||
case *RescanProgressMsg:
|
||||
// TODO: mark addresses as partially synced.
|
||||
|
||||
case *RescanFinishedMsg:
|
||||
if e.Error != nil {
|
||||
log.Errorf("Rescan failed: %v", e.Error.Message)
|
||||
break
|
||||
}
|
||||
|
||||
n := 0
|
||||
for acct, addrs := range e.Addresses {
|
||||
for i := range addrs {
|
||||
n++
|
||||
err := acct.MarkAddressSynced(addrs[i])
|
||||
if err != nil {
|
||||
log.Errorf("Error marking address synced: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
AcctMgr.ds.FlushAccount(acct)
|
||||
}
|
||||
|
||||
noun := pickNoun(n, "address", "addresses")
|
||||
log.Infof("Finished rescan for %d %s", n, noun)
|
||||
}
|
||||
AcctMgr.Release()
|
||||
}
|
||||
}
|
||||
|
||||
// Grab grabs the account manager's binary semaphore. A custom semaphore
|
||||
// is used instead of a sync.Mutex so the account manager's disk syncer
|
||||
// can grab the semaphore from a select statement.
|
||||
|
@ -520,18 +573,28 @@ func (am *AccountManager) ListUnspent(minconf, maxconf int,
|
|||
|
||||
// RescanActiveAddresses begins a rescan for all active addresses for
|
||||
// each account.
|
||||
//
|
||||
// TODO(jrick): batch addresses for all accounts together so multiple
|
||||
// rescan commands can be avoided.
|
||||
func (am *AccountManager) RescanActiveAddresses() {
|
||||
for _, account := range am.AllAccounts() {
|
||||
account.RescanActiveAddresses()
|
||||
var job *RescanJob
|
||||
for _, a := range am.AllAccounts() {
|
||||
acctJob := a.RescanActiveJob()
|
||||
if job == nil {
|
||||
job = acctJob
|
||||
} else {
|
||||
job.Merge(acctJob)
|
||||
}
|
||||
}
|
||||
if job == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Submit merged job and block until rescan completes.
|
||||
jobFinished := am.rm.SubmitJob(job)
|
||||
<-jobFinished
|
||||
}
|
||||
|
||||
func (am *AccountManager) ResendUnminedTxs() {
|
||||
for _, account := range am.AllAccounts() {
|
||||
account.ResendUnminedTxs()
|
||||
for _, a := range am.AllAccounts() {
|
||||
a.ResendUnminedTxs()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
2
cmd.go
2
cmd.go
|
@ -147,7 +147,7 @@ func main() {
|
|||
updateOldFileLocations()
|
||||
|
||||
// Start account manager and open accounts.
|
||||
go AcctMgr.Start()
|
||||
AcctMgr.Start()
|
||||
AcctMgr.OpenAccounts()
|
||||
|
||||
// Read CA file to verify a btcd TLS connection.
|
||||
|
|
|
@ -207,12 +207,17 @@ func NewDiskSyncer(am *AccountManager) *DiskSyncer {
|
|||
}
|
||||
}
|
||||
|
||||
// Start starts the disk syncer. It manages a set of "dirty" account files
|
||||
// Start starts the goroutines required to run the DiskSyncer.
|
||||
func (ds *DiskSyncer) Start() {
|
||||
go ds.handler()
|
||||
}
|
||||
|
||||
// handler runs the disk syncer. It manages a set of "dirty" account files
|
||||
// which must be written to disk, and synchronizes all writes in a single
|
||||
// goroutine. Periodic flush operations may be signaled by an AccountManager.
|
||||
//
|
||||
// This never returns and is should be called from a new goroutine.
|
||||
func (ds *DiskSyncer) Start() {
|
||||
func (ds *DiskSyncer) handler() {
|
||||
netdir := networkDir(cfg.Net())
|
||||
if err := checkCreateDir(netdir); err != nil {
|
||||
log.Errorf("Unable to create or write to account directory: %v", err)
|
||||
|
|
9
log.go
9
log.go
|
@ -97,3 +97,12 @@ func setLogLevel(logLevel string) []seelog.LoggerInterface {
|
|||
|
||||
return loggers
|
||||
}
|
||||
|
||||
// pickNoun returns the singular or plural form of a noun depending
|
||||
// on the count n.
|
||||
func pickNoun(n int, singular, plural string) string {
|
||||
if n == 1 {
|
||||
return singular
|
||||
}
|
||||
return plural
|
||||
}
|
||||
|
|
266
rescan.go
Normal file
266
rescan.go
Normal file
|
@ -0,0 +1,266 @@
|
|||
/*
|
||||
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
||||
*
|
||||
* Permission to use, copy, modify, and distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
* copyright notice and this permission notice appear in all copies.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/conformal/btcjson"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwire"
|
||||
)
|
||||
|
||||
// RescanMsg is the interface type for messages sent to the
|
||||
// RescanManager's message channel.
|
||||
type RescanMsg interface {
|
||||
ImplementsRescanMsg()
|
||||
}
|
||||
|
||||
// RescanStartedMsg reports the job being processed for a new
|
||||
// rescan.
|
||||
type RescanStartedMsg RescanJob
|
||||
|
||||
// ImplementsRescanMsg is implemented to satisify the RescanMsg
|
||||
// interface.
|
||||
func (r *RescanStartedMsg) ImplementsRescanMsg() {}
|
||||
|
||||
// RescanProgressMsg reports the current progress made by a rescan
|
||||
// for a set of account's addresses.
|
||||
type RescanProgressMsg struct {
|
||||
Addresses map[*Account][]btcutil.Address
|
||||
Height int32
|
||||
}
|
||||
|
||||
// ImplementsRescanMsg is implemented to satisify the RescanMsg
|
||||
// interface.
|
||||
func (r *RescanProgressMsg) ImplementsRescanMsg() {}
|
||||
|
||||
// RescanFinishedMsg reports the set of account's addresses of a
|
||||
// possibly-finished rescan, or an error if the rescan failed.
|
||||
type RescanFinishedMsg struct {
|
||||
Addresses map[*Account][]btcutil.Address
|
||||
Error *btcjson.Error
|
||||
}
|
||||
|
||||
// ImplementsRescanMsg is implemented to satisify the RescanMsg
|
||||
// interface.
|
||||
func (r *RescanFinishedMsg) ImplementsRescanMsg() {}
|
||||
|
||||
// RescanManager manages a set of current and to be processed account's
|
||||
// addresses, batching waiting jobs together to minimize the total time
|
||||
// needed to rescan many separate jobs. Rescan requests are processed
|
||||
// one at a time, and the next batch does not run until the current
|
||||
// has finished.
|
||||
type RescanManager struct {
|
||||
addJob chan *RescanJob
|
||||
sendJob chan *RescanJob
|
||||
status chan interface{} // rescanProgress and rescanFinished
|
||||
msgs chan RescanMsg
|
||||
jobCompleteChan chan chan struct{}
|
||||
}
|
||||
|
||||
// NewRescanManager creates a new RescanManger. If msgChan is non-nil,
|
||||
// rescan messages are sent to the channel for additional processing by
|
||||
// the caller.
|
||||
func NewRescanManager(msgChan chan RescanMsg) *RescanManager {
|
||||
return &RescanManager{
|
||||
addJob: make(chan *RescanJob, 1),
|
||||
sendJob: make(chan *RescanJob, 1),
|
||||
status: make(chan interface{}, 1),
|
||||
msgs: msgChan,
|
||||
jobCompleteChan: make(chan chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the goroutines to run the RescanManager.
|
||||
func (m *RescanManager) Start() {
|
||||
go m.jobHandler()
|
||||
go m.rpcHandler()
|
||||
}
|
||||
|
||||
type rescanBatch struct {
|
||||
addrs map[*Account][]btcutil.Address
|
||||
outpoints map[btcwire.OutPoint]struct{}
|
||||
height int32
|
||||
complete chan struct{}
|
||||
}
|
||||
|
||||
func newRescanBatch() *rescanBatch {
|
||||
return &rescanBatch{
|
||||
addrs: map[*Account][]btcutil.Address{},
|
||||
outpoints: map[btcwire.OutPoint]struct{}{},
|
||||
height: -1,
|
||||
complete: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *rescanBatch) done() {
|
||||
close(b.complete)
|
||||
}
|
||||
|
||||
func (b *rescanBatch) empty() bool {
|
||||
return len(b.addrs) == 0
|
||||
}
|
||||
|
||||
func (b *rescanBatch) job() *RescanJob {
|
||||
// Create slice of outpoint points from the batch's set.
|
||||
outpoints := make([]*btcwire.OutPoint, 0, len(b.outpoints))
|
||||
for outpoint := range b.outpoints {
|
||||
opCopy := outpoint
|
||||
outpoints = append(outpoints, &opCopy)
|
||||
}
|
||||
|
||||
return &RescanJob{
|
||||
Addresses: b.addrs,
|
||||
OutPoints: outpoints,
|
||||
StartHeight: b.height,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *rescanBatch) merge(job *RescanJob) {
|
||||
for acct, addr := range job.Addresses {
|
||||
b.addrs[acct] = append(b.addrs[acct], addr...)
|
||||
}
|
||||
for _, op := range job.OutPoints {
|
||||
b.outpoints[*op] = struct{}{}
|
||||
}
|
||||
if b.height == -1 || job.StartHeight < b.height {
|
||||
b.height = job.StartHeight
|
||||
}
|
||||
}
|
||||
|
||||
// Status types for the handler.
|
||||
type rescanProgress int32
|
||||
type rescanFinished *btcjson.Error
|
||||
|
||||
// jobHandler runs the RescanManager's for-select loop to manage rescan jobs
|
||||
// and dispatch requests.
|
||||
func (m *RescanManager) jobHandler() {
|
||||
curBatch := newRescanBatch()
|
||||
nextBatch := newRescanBatch()
|
||||
|
||||
for {
|
||||
select {
|
||||
case job := <-m.addJob:
|
||||
if curBatch.empty() {
|
||||
// Set current batch as this job and send
|
||||
// request.
|
||||
curBatch.merge(job)
|
||||
m.sendJob <- job
|
||||
|
||||
// Send the channel that is closed when the
|
||||
// current batch completes.
|
||||
m.jobCompleteChan <- curBatch.complete
|
||||
|
||||
// Notify listener of a newly-started rescan.
|
||||
if m.msgs != nil {
|
||||
m.msgs <- (*RescanStartedMsg)(job)
|
||||
}
|
||||
} else {
|
||||
// Add job to waiting batch.
|
||||
nextBatch.merge(job)
|
||||
|
||||
// Send the channel that is closed when the
|
||||
// waiting batch completes.
|
||||
m.jobCompleteChan <- nextBatch.complete
|
||||
}
|
||||
|
||||
case status := <-m.status:
|
||||
switch s := status.(type) {
|
||||
case rescanProgress:
|
||||
if m.msgs != nil {
|
||||
m.msgs <- &RescanProgressMsg{
|
||||
Addresses: curBatch.addrs,
|
||||
Height: int32(s),
|
||||
}
|
||||
}
|
||||
|
||||
case rescanFinished:
|
||||
if m.msgs != nil {
|
||||
m.msgs <- &RescanFinishedMsg{
|
||||
Addresses: curBatch.addrs,
|
||||
Error: (*btcjson.Error)(s),
|
||||
}
|
||||
}
|
||||
curBatch.done()
|
||||
|
||||
curBatch, nextBatch = nextBatch, newRescanBatch()
|
||||
|
||||
if !curBatch.empty() {
|
||||
job := curBatch.job()
|
||||
m.sendJob <- curBatch.job()
|
||||
if m.msgs != nil {
|
||||
m.msgs <- (*RescanStartedMsg)(job)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rpcHandler reads jobs sent by the jobHandler and sends the rpc requests
|
||||
// to perform the rescan. New jobs are not read until a rescan finishes.
|
||||
// The jobHandler is notified when the processing the rescan finishes.
|
||||
func (m *RescanManager) rpcHandler() {
|
||||
for job := range m.sendJob {
|
||||
var addrStrs []string
|
||||
for _, addrs := range job.Addresses {
|
||||
for i := range addrs {
|
||||
addrStrs = append(addrStrs, addrs[i].EncodeAddress())
|
||||
}
|
||||
}
|
||||
|
||||
c := CurrentServerConn()
|
||||
jsonErr := Rescan(c, job.StartHeight, addrStrs, job.OutPoints)
|
||||
m.status <- rescanFinished(jsonErr)
|
||||
}
|
||||
}
|
||||
|
||||
// RescanJob is a job to be processed by the RescanManager. The job includes
|
||||
// a set of account's addresses, a starting height to begin the rescan, and
|
||||
// outpoints spendable by the addresses thought to be unspent.
|
||||
type RescanJob struct {
|
||||
Addresses map[*Account][]btcutil.Address
|
||||
OutPoints []*btcwire.OutPoint
|
||||
StartHeight int32
|
||||
}
|
||||
|
||||
// Merge merges the work from k into j, setting the starting height to
|
||||
// the minimum of the two jobs. This method does not check for
|
||||
// duplicate addresses or outpoints.
|
||||
func (j *RescanJob) Merge(k *RescanJob) {
|
||||
for acct, addrs := range k.Addresses {
|
||||
j.Addresses[acct] = append(j.Addresses[acct], addrs...)
|
||||
}
|
||||
for _, op := range k.OutPoints {
|
||||
j.OutPoints = append(j.OutPoints, op)
|
||||
}
|
||||
if k.StartHeight < j.StartHeight {
|
||||
j.StartHeight = k.StartHeight
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitJob submits a RescanJob to the RescanManager. A channel is returned
|
||||
// that is closed once the rescan request for the job completes.
|
||||
func (m *RescanManager) SubmitJob(job *RescanJob) <-chan struct{} {
|
||||
m.addJob <- job
|
||||
return <-m.jobCompleteChan
|
||||
}
|
||||
|
||||
// MarkProgress messages the RescanManager with the height of the block
|
||||
// last processed by a running rescan.
|
||||
func (m *RescanManager) MarkProgress(height int32) {
|
||||
m.status <- rescanProgress(height)
|
||||
}
|
Loading…
Reference in a new issue