refactor stopgroup to use context, which allows parent/child cancelations

This commit is contained in:
Alex Grintsvayg 2018-06-25 16:13:28 -04:00
parent a3154d80e1
commit 65ed9d7b4b

View file

@ -18,7 +18,7 @@ import (
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources" "github.com/lbryio/lbry.go/ytsync/sources"
@ -64,7 +64,7 @@ type Sync struct {
videoDirectory string videoDirectory string
db *redisdb.DB db *redisdb.DB
stop *stopOnce.Stopper grp *stop.Group
wg sync.WaitGroup wg sync.WaitGroup
queue chan video queue chan video
@ -128,7 +128,7 @@ func (s *Sync) FullCycle() error {
} }
s.db = redisdb.New() s.db = redisdb.New()
s.stop = stopOnce.New() s.grp = stop.New()
s.queue = make(chan video) s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
@ -136,7 +136,7 @@ func (s *Sync) FullCycle() error {
go func() { go func() {
<-interruptChan <-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.stop.Stop() s.grp.Stop()
}() }()
log.Printf("Starting daemon") log.Printf("Starting daemon")
@ -152,7 +152,7 @@ func (s *Sync) FullCycle() error {
WaitForDaemonStart: WaitForDaemonStart:
for { for {
select { select {
case <-s.stop.Ch(): case <-s.grp.Ch():
return nil return nil
default: default:
_, err := s.daemon.WalletBalance() _, err := s.daemon.WalletBalance()
@ -211,7 +211,7 @@ func (s *Sync) startWorker(workerNum int) {
for { for {
select { select {
case <-s.stop.Ch(): case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum) log.Printf("Stopping worker %d", workerNum)
return return
default: default:
@ -222,7 +222,7 @@ func (s *Sync) startWorker(workerNum int) {
if !more { if !more {
return return
} }
case <-s.stop.Ch(): case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum) log.Printf("Stopping worker %d", workerNum)
return return
} }
@ -237,7 +237,7 @@ func (s *Sync) startWorker(workerNum int) {
if err != nil { if err != nil {
log.Errorln("error processing video: " + err.Error()) log.Errorln("error processing video: " + err.Error())
if s.StopOnError { if s.StopOnError {
s.stop.Stop() s.grp.Stop()
} else if s.MaxTries > 1 { } else if s.MaxTries > 1 {
if strings.Contains(err.Error(), "non 200 status code received") || if strings.Contains(err.Error(), "non 200 status code received") ||
strings.Contains(err.Error(), " reason: 'This video contains content from") || strings.Contains(err.Error(), " reason: 'This video contains content from") ||
@ -333,14 +333,14 @@ func (s *Sync) enqueueYoutubeVideos() error {
Enqueue: Enqueue:
for _, v := range videos { for _, v := range videos {
select { select {
case <-s.stop.Ch(): case <-s.grp.Ch():
break Enqueue break Enqueue
default: default:
} }
select { select {
case s.queue <- v: case s.queue <- v:
case <-s.stop.Ch(): case <-s.grp.Ch():
break Enqueue break Enqueue
} }
} }
@ -382,14 +382,14 @@ func (s *Sync) enqueueUCBVideos() error {
Enqueue: Enqueue:
for _, v := range videos { for _, v := range videos {
select { select {
case <-s.stop.Ch(): case <-s.grp.Ch():
break Enqueue break Enqueue
default: default:
} }
select { select {
case s.queue <- v: case s.queue <- v:
case <-s.stop.Ch(): case <-s.grp.Ch():
break Enqueue break Enqueue
} }
} }