add Link to stop Stopper when an upstream Stopper is stopped

This commit is contained in:
Alex Grintsvayg 2018-06-13 12:37:19 -04:00
parent d062d4ad7f
commit de40df28c1
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5

View file

@ -2,12 +2,17 @@ package stopOnce
import "sync" import "sync"
// Chan is a receive-only channel
type Chan <-chan struct{}
// Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines
type Stopper struct { type Stopper struct {
sync.WaitGroup sync.WaitGroup
ch chan struct{} ch chan struct{}
once sync.Once once sync.Once
} }
// New allocates and returns a new Stopper instance
func New() *Stopper { func New() *Stopper {
s := &Stopper{} s := &Stopper{}
s.ch = make(chan struct{}) s.ch = make(chan struct{})
@ -15,17 +20,32 @@ func New() *Stopper {
return s return s
} }
func (s *Stopper) Ch() <-chan struct{} { // Ch returns a channel that will be closed when Stop is called
func (s *Stopper) Ch() Chan {
return s.ch return s.ch
} }
// Stop closes the stopper channel. It is safe to call Stop many times. The channel will only be closed the first time.
func (s *Stopper) Stop() { func (s *Stopper) Stop() {
s.once.Do(func() { s.once.Do(func() {
close(s.ch) close(s.ch)
}) })
} }
// StopAndWait is a convenience method to close the channel and wait for goroutines to return
func (s *Stopper) StopAndWait() { func (s *Stopper) StopAndWait() {
s.Stop() s.Stop()
s.Wait() s.Wait()
} }
// Link will stop s if upstream is stopped.
// If you use Link, make sure you stop `s` when you're done with it. Otherwise this goroutine will be leaked.
func (s *Stopper) Link(upstream Chan) {
go func() {
select {
case <-upstream: // linked Stopper is stopped
s.Stop()
case <-s.Ch(): // this Stopper is stopped
}
}()
}