ADD support for stopper Wait propagation. #77

Closed
tiger5226 wants to merge 1 commit from stopper_propagation into master
2 changed files with 96 additions and 8 deletions

View file

@ -25,7 +25,8 @@ type Chan <-chan struct{}
// Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines // Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines
type Group struct { type Group struct {
sync.WaitGroup wg sync.WaitGroup
parents []*Group
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -35,11 +36,11 @@ type Group struct {
type Stopper = Group type Stopper = Group
// New allocates and returns a new instance. Use New(parent) to create an instance that is stopped when parent is stopped. // New allocates and returns a new instance. Use New(parent) to create an instance that is stopped when parent is stopped.
func New(parent ...*Group) *Group { func New(parents ...*Group) *Group {
s := &Group{} s := &Group{parents: parents}
ctx := context.Background() ctx := context.Background()
if len(parent) > 0 && parent[0] != nil { if len(parents) > 0 && parents[0] != nil {
ctx = parent[0].ctx ctx = parents[0].ctx
} }
s.ctx, s.cancel = context.WithCancel(ctx) s.ctx, s.cancel = context.WithCancel(ctx)
return s return s
@ -67,7 +68,7 @@ func (s *Group) Stop() {
// StopAndWait is a convenience method to close the channel and wait for goroutines to return. // StopAndWait is a convenience method to close the channel and wait for goroutines to return.
func (s *Group) StopAndWait() { func (s *Group) StopAndWait() {
s.Stop() s.Stop()
s.Wait() s.wg.Wait()
} }
// Child returns a new instance that will be stopped when s is stopped. // Child returns a new instance that will be stopped when s is stopped.
@ -75,6 +76,28 @@ func (s *Group) Child() *Group {
return New(s) return New(s)
} }
func (s *Group) Add(delta int) {
s.wg.Add(delta)
for _, parent := range s.parents {
if parent != nil {
parent.Add(1)
}
}
}
func (s *Group) Wait() {
s.wg.Wait()
}
func (s *Group) Done() {
s.wg.Done()
for _, parent := range s.parents {
if parent != nil {
parent.Done()
}
}
}
//AddNamed is the same as Add but will register the functional name of the routine for later output. See `DoneNamed`. //AddNamed is the same as Add but will register the functional name of the routine for later output. See `DoneNamed`.
func (s *Group) AddNamed(delta int, name string) { func (s *Group) AddNamed(delta int, name string) {
s.Add(delta) s.Add(delta)

65
extras/stop/stop_test.go Normal file
View file

@ -0,0 +1,65 @@
package stop
import (
"testing"
"time"
)
func TestStopParentChild(t *testing.T) {
stopper := New()
parent := stopper.Child()
child := New(parent)
ticker := time.NewTicker(1 * time.Second)
go func() {
time.Sleep(3 * time.Second)
stopper.Stop()
}()
loop:
for {
select {
case <-ticker.C:
continue
case <-child.Ch():
break loop
}
}
//Will run forever if stop is not propagated
}
var tracker int
var trackerExpected int
func TestWaitParentChild(t *testing.T) {
parent := New()
child1 := parent.Child()
child2 := parent.Child()
child1.Add(1)
go runChild("child1", child1, 1*time.Second)
child2.Add(1)
go runChild("child2", child2, 2*time.Second)
time.Sleep(5 * time.Second)
parent.Stop()
parent.Wait()
if tracker != trackerExpected {
t.Errorf("Stopper is not waiting for children to finish, expected %d, got %d", trackerExpected, tracker)
}
}
func runChild(name string, child *Group, duration time.Duration) {
defer child.Done()
ticker := time.NewTicker(duration)
loop:
for {
select {
case <-ticker.C:
trackerExpected++
println(name + " start tick++")
time.Sleep(2 * duration)
tracker++
println(name + " end tick++")
continue loop
case <-child.Ch():
break loop
}
}
}