ADD support for stopper Wait propagation.
ADD unit tests for Wait and Stop for stopper children.
This commit is contained in:
parent
ef1b10b601
commit
bb7946a861
2 changed files with 96 additions and 8 deletions
|
@ -25,9 +25,10 @@ type Chan <-chan struct{}
|
||||||
|
|
||||||
// Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines
|
// Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines
|
||||||
type Group struct {
|
type Group struct {
|
||||||
sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
ctx context.Context
|
parents []*Group
|
||||||
cancel context.CancelFunc
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
|
||||||
mu *sync.Mutex
|
mu *sync.Mutex
|
||||||
waitingOn map[string]int
|
waitingOn map[string]int
|
||||||
|
@ -35,11 +36,11 @@ type Group struct {
|
||||||
type Stopper = Group
|
type Stopper = Group
|
||||||
|
|
||||||
// New allocates and returns a new instance. Use New(parent) to create an instance that is stopped when parent is stopped.
|
// New allocates and returns a new instance. Use New(parent) to create an instance that is stopped when parent is stopped.
|
||||||
func New(parent ...*Group) *Group {
|
func New(parents ...*Group) *Group {
|
||||||
s := &Group{}
|
s := &Group{parents: parents}
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
if len(parent) > 0 && parent[0] != nil {
|
if len(parents) > 0 && parents[0] != nil {
|
||||||
ctx = parent[0].ctx
|
ctx = parents[0].ctx
|
||||||
}
|
}
|
||||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
return s
|
return s
|
||||||
|
@ -67,7 +68,7 @@ func (s *Group) Stop() {
|
||||||
// StopAndWait is a convenience method to close the channel and wait for goroutines to return.
|
// StopAndWait is a convenience method to close the channel and wait for goroutines to return.
|
||||||
func (s *Group) StopAndWait() {
|
func (s *Group) StopAndWait() {
|
||||||
s.Stop()
|
s.Stop()
|
||||||
s.Wait()
|
s.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Child returns a new instance that will be stopped when s is stopped.
|
// Child returns a new instance that will be stopped when s is stopped.
|
||||||
|
@ -75,6 +76,28 @@ func (s *Group) Child() *Group {
|
||||||
return New(s)
|
return New(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Group) Add(delta int) {
|
||||||
|
s.wg.Add(delta)
|
||||||
|
for _, parent := range s.parents {
|
||||||
|
if parent != nil {
|
||||||
|
parent.Add(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Group) Wait() {
|
||||||
|
s.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Group) Done() {
|
||||||
|
s.wg.Done()
|
||||||
|
for _, parent := range s.parents {
|
||||||
|
if parent != nil {
|
||||||
|
parent.Done()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//AddNamed is the same as Add but will register the functional name of the routine for later output. See `DoneNamed`.
|
//AddNamed is the same as Add but will register the functional name of the routine for later output. See `DoneNamed`.
|
||||||
func (s *Group) AddNamed(delta int, name string) {
|
func (s *Group) AddNamed(delta int, name string) {
|
||||||
s.Add(delta)
|
s.Add(delta)
|
||||||
|
|
65
extras/stop/stop_test.go
Normal file
65
extras/stop/stop_test.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package stop
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStopParentChild(t *testing.T) {
|
||||||
|
stopper := New()
|
||||||
|
parent := stopper.Child()
|
||||||
|
child := New(parent)
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
go func() {
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
stopper.Stop()
|
||||||
|
}()
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
continue
|
||||||
|
case <-child.Ch():
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//Will run forever if stop is not propagated
|
||||||
|
}
|
||||||
|
|
||||||
|
var tracker int
|
||||||
|
var trackerExpected int
|
||||||
|
|
||||||
|
func TestWaitParentChild(t *testing.T) {
|
||||||
|
parent := New()
|
||||||
|
child1 := parent.Child()
|
||||||
|
child2 := parent.Child()
|
||||||
|
child1.Add(1)
|
||||||
|
go runChild("child1", child1, 1*time.Second)
|
||||||
|
child2.Add(1)
|
||||||
|
go runChild("child2", child2, 2*time.Second)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
parent.Stop()
|
||||||
|
parent.Wait()
|
||||||
|
if tracker != trackerExpected {
|
||||||
|
t.Errorf("Stopper is not waiting for children to finish, expected %d, got %d", trackerExpected, tracker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runChild(name string, child *Group, duration time.Duration) {
|
||||||
|
defer child.Done()
|
||||||
|
ticker := time.NewTicker(duration)
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
trackerExpected++
|
||||||
|
println(name + " start tick++")
|
||||||
|
time.Sleep(2 * duration)
|
||||||
|
tracker++
|
||||||
|
println(name + " end tick++")
|
||||||
|
continue loop
|
||||||
|
case <-child.Ch():
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue