ADD support for stopper Wait propagation. #77
2 changed files with 96 additions and 8 deletions
|
@ -25,9 +25,10 @@ type Chan <-chan struct{}
|
|||
|
||||
// Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines
|
||||
type Group struct {
|
||||
sync.WaitGroup
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
parents []*Group
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
mu *sync.Mutex
|
||||
waitingOn map[string]int
|
||||
|
@ -35,11 +36,11 @@ type Group struct {
|
|||
type Stopper = Group
|
||||
|
||||
// New allocates and returns a new instance. Use New(parent) to create an instance that is stopped when parent is stopped.
|
||||
func New(parent ...*Group) *Group {
|
||||
s := &Group{}
|
||||
func New(parents ...*Group) *Group {
|
||||
s := &Group{parents: parents}
|
||||
ctx := context.Background()
|
||||
if len(parent) > 0 && parent[0] != nil {
|
||||
ctx = parent[0].ctx
|
||||
if len(parents) > 0 && parents[0] != nil {
|
||||
ctx = parents[0].ctx
|
||||
}
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
return s
|
||||
|
@ -67,7 +68,7 @@ func (s *Group) Stop() {
|
|||
// StopAndWait is a convenience method to close the channel and wait for goroutines to return.
|
||||
func (s *Group) StopAndWait() {
|
||||
s.Stop()
|
||||
s.Wait()
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
// Child returns a new instance that will be stopped when s is stopped.
|
||||
|
@ -75,6 +76,28 @@ func (s *Group) Child() *Group {
|
|||
return New(s)
|
||||
}
|
||||
|
||||
func (s *Group) Add(delta int) {
|
||||
s.wg.Add(delta)
|
||||
for _, parent := range s.parents {
|
||||
if parent != nil {
|
||||
parent.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Group) Wait() {
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Group) Done() {
|
||||
s.wg.Done()
|
||||
for _, parent := range s.parents {
|
||||
if parent != nil {
|
||||
parent.Done()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//AddNamed is the same as Add but will register the functional name of the routine for later output. See `DoneNamed`.
|
||||
func (s *Group) AddNamed(delta int, name string) {
|
||||
s.Add(delta)
|
||||
|
|
65
extras/stop/stop_test.go
Normal file
65
extras/stop/stop_test.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package stop
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestStopParentChild(t *testing.T) {
|
||||
stopper := New()
|
||||
parent := stopper.Child()
|
||||
child := New(parent)
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
go func() {
|
||||
time.Sleep(3 * time.Second)
|
||||
stopper.Stop()
|
||||
}()
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
continue
|
||||
case <-child.Ch():
|
||||
break loop
|
||||
}
|
||||
}
|
||||
//Will run forever if stop is not propagated
|
||||
}
|
||||
|
||||
var tracker int
|
||||
var trackerExpected int
|
||||
|
||||
func TestWaitParentChild(t *testing.T) {
|
||||
parent := New()
|
||||
child1 := parent.Child()
|
||||
child2 := parent.Child()
|
||||
child1.Add(1)
|
||||
go runChild("child1", child1, 1*time.Second)
|
||||
child2.Add(1)
|
||||
go runChild("child2", child2, 2*time.Second)
|
||||
time.Sleep(5 * time.Second)
|
||||
parent.Stop()
|
||||
parent.Wait()
|
||||
if tracker != trackerExpected {
|
||||
t.Errorf("Stopper is not waiting for children to finish, expected %d, got %d", trackerExpected, tracker)
|
||||
}
|
||||
}
|
||||
|
||||
func runChild(name string, child *Group, duration time.Duration) {
|
||||
defer child.Done()
|
||||
ticker := time.NewTicker(duration)
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
trackerExpected++
|
||||
println(name + " start tick++")
|
||||
time.Sleep(2 * duration)
|
||||
tracker++
|
||||
println(name + " end tick++")
|
||||
continue loop
|
||||
case <-child.Ch():
|
||||
break loop
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue