diff --git a/chain/neutrino.go b/chain/neutrino.go index e889b72..373e301 100644 --- a/chain/neutrino.go +++ b/chain/neutrino.go @@ -24,6 +24,8 @@ type NeutrinoClient struct { enqueueNotification chan interface{} dequeueNotification chan interface{} + startTime time.Time + lastProgressSent bool currentBlock chan *waddrmgr.BlockStamp quit chan struct{} @@ -33,6 +35,7 @@ type NeutrinoClient struct { started bool scanning bool finished bool + isRescan bool clientMtx sync.Mutex } @@ -168,6 +171,8 @@ func (s *NeutrinoClient) Rescan(startHash *chainhash.Hash, addrs []btcutil.Addre s.rescanQuit = make(chan struct{}) s.scanning = true s.finished = false + s.lastProgressSent = false + s.isRescan = true watchOutPoints := make([]wire.OutPoint, 0, len(outPoints)) for _, op := range outPoints { watchOutPoints = append(watchOutPoints, *op) @@ -202,6 +207,7 @@ func (s *NeutrinoClient) Rescan(startHash *chainhash.Hash, addrs []btcutil.Addre OnBlockDisconnected: s.onBlockDisconnected, }), neutrino.StartBlock(&waddrmgr.BlockStamp{Hash: *startHash}), + neutrino.StartTime(s.startTime), neutrino.QuitChan(s.rescanQuit), neutrino.WatchAddrs(addrs...), neutrino.WatchOutPoints(watchOutPoints...), @@ -238,8 +244,9 @@ func (s *NeutrinoClient) NotifyReceived(addrs []btcutil.Address) error { s.rescanQuit = make(chan struct{}) s.scanning = true - // Don't need RescanFinished notifications. + // Don't need RescanFinished or RescanProgress notifications. s.finished = true + s.lastProgressSent = true // Rescan with just the specified addresses. s.rescan = s.CS.NewRescan( @@ -248,6 +255,7 @@ func (s *NeutrinoClient) NotifyReceived(addrs []btcutil.Address) error { OnFilteredBlockConnected: s.onFilteredBlockConnected, OnBlockDisconnected: s.onBlockDisconnected, }), + neutrino.StartTime(s.startTime), neutrino.QuitChan(s.rescanQuit), neutrino.WatchAddrs(addrs...), ) @@ -260,6 +268,19 @@ func (s *NeutrinoClient) Notifications() <-chan interface{} { return s.dequeueNotification } +// SetStartTime is a non-interface method to set the birthday of the wallet +// using this object. Since only a single rescan at a time is currently +// supported, only one birthday needs to be set. This does not fully restart a +// running rescan, so should not be used to update a rescan while it is running. +// TODO: When factoring out to multiple rescans per Neutrino client, add a +// birthday per client. +func (s *NeutrinoClient) SetStartTime(startTime time.Time) { + s.clientMtx.Lock() + defer s.clientMtx.Unlock() + + s.startTime = startTime +} + // onFilteredBlockConnected sends appropriate notifications to the notification // channel. func (s *NeutrinoClient) onFilteredBlockConnected(height int32, @@ -291,11 +312,14 @@ func (s *NeutrinoClient) onFilteredBlockConnected(height int32, case <-s.rescanQuit: return } + + // Handle RescanFinished notification if required. bs, err := s.CS.BestSnapshot() if err != nil { log.Errorf("Can't get chain service's best block: %s", err) return } + if bs.Hash == header.BlockHash() { // Only send the RescanFinished notification once. s.clientMtx.Lock() @@ -303,18 +327,25 @@ func (s *NeutrinoClient) onFilteredBlockConnected(height int32, s.clientMtx.Unlock() return } - s.finished = true + // Only send the RescanFinished notification once the + // underlying chain service sees itself as current. + current := s.CS.IsCurrent() && s.lastProgressSent + if current { + s.finished = true + } s.clientMtx.Unlock() - select { - case s.enqueueNotification <- &RescanFinished{ - Hash: &bs.Hash, - Height: bs.Height, - Time: header.Timestamp, - }: - case <-s.quit: - return - case <-s.rescanQuit: - return + if current { + select { + case s.enqueueNotification <- &RescanFinished{ + Hash: &bs.Hash, + Height: bs.Height, + Time: header.Timestamp, + }: + case <-s.quit: + return + case <-s.rescanQuit: + return + } } } } @@ -338,17 +369,58 @@ func (s *NeutrinoClient) onBlockDisconnected(hash *chainhash.Hash, height int32, func (s *NeutrinoClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) { - - select { - case s.enqueueNotification <- BlockConnected{ - Block: wtxmgr.Block{ - Hash: *hash, + // TODO: Move this closure out and parameterize it? Is it useful + // outside here? + sendRescanProgress := func() { + select { + case s.enqueueNotification <- &RescanProgress{ + Hash: hash, Height: height, - }, - Time: time, - }: - case <-s.quit: - case <-s.rescanQuit: + Time: time, + }: + case <-s.quit: + case <-s.rescanQuit: + } + } + // Only send BlockConnected notification if we're processing blocks + // before the birthday. Otherwise, we can just update using + // RescanProgress notifications. + if time.Before(s.startTime) { + // Send a RescanProgress notification every 10K blocks. + if height%10000 == 0 { + s.clientMtx.Lock() + shouldSend := s.isRescan && !s.finished + s.clientMtx.Unlock() + if shouldSend { + sendRescanProgress() + } + } + } else { + // Send a RescanProgress notification if we're just going over + // the boundary between pre-birthday and post-birthday blocks, + // and note that we've sent it. + s.clientMtx.Lock() + if !s.lastProgressSent { + shouldSend := s.isRescan && !s.finished + if shouldSend { + s.clientMtx.Unlock() + sendRescanProgress() + s.clientMtx.Lock() + s.lastProgressSent = true + } + } + s.clientMtx.Unlock() + select { + case s.enqueueNotification <- BlockConnected{ + Block: wtxmgr.Block{ + Hash: *hash, + Height: height, + }, + Time: time, + }: + case <-s.quit: + case <-s.rescanQuit: + } } }