refactor stopgroup to use context, which allows parent/child cancelations

This commit is contained in:
Alex Grintsvayg 2018-06-25 16:13:28 -04:00
parent f0762e9c57
commit 821cfb748e
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
3 changed files with 61 additions and 63 deletions

49
stop/stop.go Normal file
View file

@ -0,0 +1,49 @@
package stop
import (
"context"
"sync"
)
// Chan is a receive-only channel
type Chan <-chan struct{}
// Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines
type Group struct {
sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
type Stopper = Group
// New allocates and returns a new instance. Use New(parent) to create an instance that is stopped when parent is stopped.
func New(parent ...*Group) *Group {
s := &Group{}
ctx := context.Background()
if len(parent) > 0 && parent[0] != nil {
ctx = parent[0].ctx
}
s.ctx, s.cancel = context.WithCancel(ctx)
return s
}
// Ch returns a channel that will be closed when Stop is called.
func (s *Group) Ch() Chan {
return s.ctx.Done()
}
// Stop signals any listening processes to stop. After the first call, Stop() does nothing.
func (s *Group) Stop() {
s.cancel()
}
// StopAndWait is a convenience method to close the channel and wait for goroutines to return.
func (s *Group) StopAndWait() {
s.Stop()
s.Wait()
}
// Child returns a new instance that will be stopped when s is stopped.
func (s *Group) Child() *Group {
return New(s)
}

View file

@ -1,51 +0,0 @@
package stopOnce
import "sync"
// Chan is a receive-only channel
type Chan <-chan struct{}
// Stopper extends sync.WaitGroup to add a convenient way to stop running goroutines
type Stopper struct {
sync.WaitGroup
ch chan struct{}
once sync.Once
}
// New allocates and returns a new Stopper instance
func New() *Stopper {
s := &Stopper{}
s.ch = make(chan struct{})
s.once = sync.Once{}
return s
}
// Ch returns a channel that will be closed when Stop is called
func (s *Stopper) Ch() Chan {
return s.ch
}
// Stop closes the stopper channel. It is safe to call Stop many times. The channel will only be closed the first time.
func (s *Stopper) Stop() {
s.once.Do(func() {
close(s.ch)
})
}
// StopAndWait is a convenience method to close the channel and wait for goroutines to return
func (s *Stopper) StopAndWait() {
s.Stop()
s.Wait()
}
// Link will stop s if upstream is stopped.
// If you use Link, make sure you stop `s` when you're done with it. Otherwise this goroutine will be leaked.
func (s *Stopper) Link(upstream Chan) {
go func() {
select {
case <-upstream: // linked Stopper is stopped
s.Stop()
case <-s.Ch(): // this Stopper is stopped
}
}()
}

View file

@ -18,7 +18,7 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stopOnce"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources"
@ -64,7 +64,7 @@ type Sync struct {
videoDirectory string
db *redisdb.DB
stop *stopOnce.Stopper
grp *stop.Group
wg sync.WaitGroup
queue chan video
@ -128,7 +128,7 @@ func (s *Sync) FullCycle() error {
}
s.db = redisdb.New()
s.stop = stopOnce.New()
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
@ -136,7 +136,7 @@ func (s *Sync) FullCycle() error {
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.stop.Stop()
s.grp.Stop()
}()
log.Printf("Starting daemon")
@ -152,7 +152,7 @@ func (s *Sync) FullCycle() error {
WaitForDaemonStart:
for {
select {
case <-s.stop.Ch():
case <-s.grp.Ch():
return nil
default:
_, err := s.daemon.WalletBalance()
@ -211,7 +211,7 @@ func (s *Sync) startWorker(workerNum int) {
for {
select {
case <-s.stop.Ch():
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
default:
@ -222,7 +222,7 @@ func (s *Sync) startWorker(workerNum int) {
if !more {
return
}
case <-s.stop.Ch():
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
}
@ -237,7 +237,7 @@ func (s *Sync) startWorker(workerNum int) {
if err != nil {
log.Errorln("error processing video: " + err.Error())
if s.StopOnError {
s.stop.Stop()
s.grp.Stop()
} else if s.MaxTries > 1 {
if strings.Contains(err.Error(), "non 200 status code received") ||
strings.Contains(err.Error(), " reason: 'This video contains content from") ||
@ -333,14 +333,14 @@ func (s *Sync) enqueueYoutubeVideos() error {
Enqueue:
for _, v := range videos {
select {
case <-s.stop.Ch():
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.stop.Ch():
case <-s.grp.Ch():
break Enqueue
}
}
@ -382,14 +382,14 @@ func (s *Sync) enqueueUCBVideos() error {
Enqueue:
for _, v := range videos {
select {
case <-s.stop.Ch():
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.stop.Ch():
case <-s.grp.Ch():
break Enqueue
}
}