chain: add StartTime support to neutrino client

This commit adds support for using StartTime in a rescan in
neutrino. The NeutrinoClient instance can have a birthday set
such that all underlying neutrino rescans are called with that
start time.
This commit is contained in:
Alex 2017-09-19 16:53:23 -06:00 committed by Olaoluwa Osuntokun
parent 555bd5d583
commit e06434ed75

View file

@ -24,6 +24,8 @@ type NeutrinoClient struct {
enqueueNotification chan interface{} enqueueNotification chan interface{}
dequeueNotification chan interface{} dequeueNotification chan interface{}
startTime time.Time
lastProgressSent bool
currentBlock chan *waddrmgr.BlockStamp currentBlock chan *waddrmgr.BlockStamp
quit chan struct{} quit chan struct{}
@ -33,6 +35,7 @@ type NeutrinoClient struct {
started bool started bool
scanning bool scanning bool
finished bool finished bool
isRescan bool
clientMtx sync.Mutex clientMtx sync.Mutex
} }
@ -168,6 +171,8 @@ func (s *NeutrinoClient) Rescan(startHash *chainhash.Hash, addrs []btcutil.Addre
s.rescanQuit = make(chan struct{}) s.rescanQuit = make(chan struct{})
s.scanning = true s.scanning = true
s.finished = false s.finished = false
s.lastProgressSent = false
s.isRescan = true
watchOutPoints := make([]wire.OutPoint, 0, len(outPoints)) watchOutPoints := make([]wire.OutPoint, 0, len(outPoints))
for _, op := range outPoints { for _, op := range outPoints {
watchOutPoints = append(watchOutPoints, *op) watchOutPoints = append(watchOutPoints, *op)
@ -202,6 +207,7 @@ func (s *NeutrinoClient) Rescan(startHash *chainhash.Hash, addrs []btcutil.Addre
OnBlockDisconnected: s.onBlockDisconnected, OnBlockDisconnected: s.onBlockDisconnected,
}), }),
neutrino.StartBlock(&waddrmgr.BlockStamp{Hash: *startHash}), neutrino.StartBlock(&waddrmgr.BlockStamp{Hash: *startHash}),
neutrino.StartTime(s.startTime),
neutrino.QuitChan(s.rescanQuit), neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...), neutrino.WatchAddrs(addrs...),
neutrino.WatchOutPoints(watchOutPoints...), neutrino.WatchOutPoints(watchOutPoints...),
@ -238,8 +244,9 @@ func (s *NeutrinoClient) NotifyReceived(addrs []btcutil.Address) error {
s.rescanQuit = make(chan struct{}) s.rescanQuit = make(chan struct{})
s.scanning = true s.scanning = true
// Don't need RescanFinished notifications. // Don't need RescanFinished or RescanProgress notifications.
s.finished = true s.finished = true
s.lastProgressSent = true
// Rescan with just the specified addresses. // Rescan with just the specified addresses.
s.rescan = s.CS.NewRescan( s.rescan = s.CS.NewRescan(
@ -248,6 +255,7 @@ func (s *NeutrinoClient) NotifyReceived(addrs []btcutil.Address) error {
OnFilteredBlockConnected: s.onFilteredBlockConnected, OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected, OnBlockDisconnected: s.onBlockDisconnected,
}), }),
neutrino.StartTime(s.startTime),
neutrino.QuitChan(s.rescanQuit), neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...), neutrino.WatchAddrs(addrs...),
) )
@ -260,6 +268,19 @@ func (s *NeutrinoClient) Notifications() <-chan interface{} {
return s.dequeueNotification return s.dequeueNotification
} }
// SetStartTime is a non-interface method to set the birthday of the wallet
// using this object. Since only a single rescan at a time is currently
// supported, only one birthday needs to be set. This does not fully restart a
// running rescan, so should not be used to update a rescan while it is running.
// TODO: When factoring out to multiple rescans per Neutrino client, add a
// birthday per client.
func (s *NeutrinoClient) SetStartTime(startTime time.Time) {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
s.startTime = startTime
}
// onFilteredBlockConnected sends appropriate notifications to the notification // onFilteredBlockConnected sends appropriate notifications to the notification
// channel. // channel.
func (s *NeutrinoClient) onFilteredBlockConnected(height int32, func (s *NeutrinoClient) onFilteredBlockConnected(height int32,
@ -291,11 +312,14 @@ func (s *NeutrinoClient) onFilteredBlockConnected(height int32,
case <-s.rescanQuit: case <-s.rescanQuit:
return return
} }
// Handle RescanFinished notification if required.
bs, err := s.CS.BestSnapshot() bs, err := s.CS.BestSnapshot()
if err != nil { if err != nil {
log.Errorf("Can't get chain service's best block: %s", err) log.Errorf("Can't get chain service's best block: %s", err)
return return
} }
if bs.Hash == header.BlockHash() { if bs.Hash == header.BlockHash() {
// Only send the RescanFinished notification once. // Only send the RescanFinished notification once.
s.clientMtx.Lock() s.clientMtx.Lock()
@ -303,18 +327,25 @@ func (s *NeutrinoClient) onFilteredBlockConnected(height int32,
s.clientMtx.Unlock() s.clientMtx.Unlock()
return return
} }
s.finished = true // Only send the RescanFinished notification once the
// underlying chain service sees itself as current.
current := s.CS.IsCurrent() && s.lastProgressSent
if current {
s.finished = true
}
s.clientMtx.Unlock() s.clientMtx.Unlock()
select { if current {
case s.enqueueNotification <- &RescanFinished{ select {
Hash: &bs.Hash, case s.enqueueNotification <- &RescanFinished{
Height: bs.Height, Hash: &bs.Hash,
Time: header.Timestamp, Height: bs.Height,
}: Time: header.Timestamp,
case <-s.quit: }:
return case <-s.quit:
case <-s.rescanQuit: return
return case <-s.rescanQuit:
return
}
} }
} }
} }
@ -338,17 +369,58 @@ func (s *NeutrinoClient) onBlockDisconnected(hash *chainhash.Hash, height int32,
func (s *NeutrinoClient) onBlockConnected(hash *chainhash.Hash, height int32, func (s *NeutrinoClient) onBlockConnected(hash *chainhash.Hash, height int32,
time time.Time) { time time.Time) {
// TODO: Move this closure out and parameterize it? Is it useful
select { // outside here?
case s.enqueueNotification <- BlockConnected{ sendRescanProgress := func() {
Block: wtxmgr.Block{ select {
Hash: *hash, case s.enqueueNotification <- &RescanProgress{
Hash: hash,
Height: height, Height: height,
}, Time: time,
Time: time, }:
}: case <-s.quit:
case <-s.quit: case <-s.rescanQuit:
case <-s.rescanQuit: }
}
// Only send BlockConnected notification if we're processing blocks
// before the birthday. Otherwise, we can just update using
// RescanProgress notifications.
if time.Before(s.startTime) {
// Send a RescanProgress notification every 10K blocks.
if height%10000 == 0 {
s.clientMtx.Lock()
shouldSend := s.isRescan && !s.finished
s.clientMtx.Unlock()
if shouldSend {
sendRescanProgress()
}
}
} else {
// Send a RescanProgress notification if we're just going over
// the boundary between pre-birthday and post-birthday blocks,
// and note that we've sent it.
s.clientMtx.Lock()
if !s.lastProgressSent {
shouldSend := s.isRescan && !s.finished
if shouldSend {
s.clientMtx.Unlock()
sendRescanProgress()
s.clientMtx.Lock()
s.lastProgressSent = true
}
}
s.clientMtx.Unlock()
select {
case s.enqueueNotification <- BlockConnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
},
Time: time,
}:
case <-s.quit:
case <-s.rescanQuit:
}
} }
} }