From bb7946a8618a3a4d8b1e3b98935b84fd4eeb28c7 Mon Sep 17 00:00:00 2001 From: Mark Beamer Jr Date: Sat, 9 Nov 2019 12:25:33 -0500 Subject: [PATCH] ADD support for stopper Wait propagation. ADD unit tests for Wait and Stop for stopper children. --- extras/stop/stop.go | 39 +++++++++++++++++++----- extras/stop/stop_test.go | 65 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 extras/stop/stop_test.go diff --git a/extras/stop/stop.go b/extras/stop/stop.go index c599ec1..7cb1c42 100644 --- a/extras/stop/stop.go +++ b/extras/stop/stop.go @@ -25,9 +25,10 @@ type Chan <-chan struct{} // Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines type Group struct { - sync.WaitGroup - ctx context.Context - cancel context.CancelFunc + wg sync.WaitGroup + parents []*Group + ctx context.Context + cancel context.CancelFunc mu *sync.Mutex waitingOn map[string]int @@ -35,11 +36,11 @@ type Group struct { type Stopper = Group // New allocates and returns a new instance. Use New(parent) to create an instance that is stopped when parent is stopped. -func New(parent ...*Group) *Group { - s := &Group{} +func New(parents ...*Group) *Group { + s := &Group{parents: parents} ctx := context.Background() - if len(parent) > 0 && parent[0] != nil { - ctx = parent[0].ctx + if len(parents) > 0 && parents[0] != nil { + ctx = parents[0].ctx } s.ctx, s.cancel = context.WithCancel(ctx) return s @@ -67,7 +68,7 @@ func (s *Group) Stop() { // StopAndWait is a convenience method to close the channel and wait for goroutines to return. func (s *Group) StopAndWait() { s.Stop() - s.Wait() + s.wg.Wait() } // Child returns a new instance that will be stopped when s is stopped. @@ -75,6 +76,28 @@ func (s *Group) Child() *Group { return New(s) } +func (s *Group) Add(delta int) { + s.wg.Add(delta) + for _, parent := range s.parents { + if parent != nil { + parent.Add(1) + } + } +} + +func (s *Group) Wait() { + s.wg.Wait() +} + +func (s *Group) Done() { + s.wg.Done() + for _, parent := range s.parents { + if parent != nil { + parent.Done() + } + } +} + //AddNamed is the same as Add but will register the functional name of the routine for later output. See `DoneNamed`. func (s *Group) AddNamed(delta int, name string) { s.Add(delta) diff --git a/extras/stop/stop_test.go b/extras/stop/stop_test.go new file mode 100644 index 0000000..39dc28a --- /dev/null +++ b/extras/stop/stop_test.go @@ -0,0 +1,65 @@ +package stop + +import ( + "testing" + "time" +) + +func TestStopParentChild(t *testing.T) { + stopper := New() + parent := stopper.Child() + child := New(parent) + ticker := time.NewTicker(1 * time.Second) + go func() { + time.Sleep(3 * time.Second) + stopper.Stop() + }() +loop: + for { + select { + case <-ticker.C: + continue + case <-child.Ch(): + break loop + } + } + //Will run forever if stop is not propagated +} + +var tracker int +var trackerExpected int + +func TestWaitParentChild(t *testing.T) { + parent := New() + child1 := parent.Child() + child2 := parent.Child() + child1.Add(1) + go runChild("child1", child1, 1*time.Second) + child2.Add(1) + go runChild("child2", child2, 2*time.Second) + time.Sleep(5 * time.Second) + parent.Stop() + parent.Wait() + if tracker != trackerExpected { + t.Errorf("Stopper is not waiting for children to finish, expected %d, got %d", trackerExpected, tracker) + } +} + +func runChild(name string, child *Group, duration time.Duration) { + defer child.Done() + ticker := time.NewTicker(duration) +loop: + for { + select { + case <-ticker.C: + trackerExpected++ + println(name + " start tick++") + time.Sleep(2 * duration) + tracker++ + println(name + " end tick++") + continue loop + case <-child.Ch(): + break loop + } + } +} -- 2.45.2