From 65ed9d7b4b6a1bc0803c9357ad77178689f99b1c Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 25 Jun 2018 16:13:28 -0400 Subject: [PATCH] refactor stopgroup to use context, which allows parent/child cancelations --- ytsync.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ytsync.go b/ytsync.go index 7cb7469..3dc9cee 100644 --- a/ytsync.go +++ b/ytsync.go @@ -18,7 +18,7 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/sources" @@ -64,7 +64,7 @@ type Sync struct { videoDirectory string db *redisdb.DB - stop *stopOnce.Stopper + grp *stop.Group wg sync.WaitGroup queue chan video @@ -128,7 +128,7 @@ func (s *Sync) FullCycle() error { } s.db = redisdb.New() - s.stop = stopOnce.New() + s.grp = stop.New() s.queue = make(chan video) interruptChan := make(chan os.Signal, 1) @@ -136,7 +136,7 @@ func (s *Sync) FullCycle() error { go func() { <-interruptChan log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") - s.stop.Stop() + s.grp.Stop() }() log.Printf("Starting daemon") @@ -152,7 +152,7 @@ func (s *Sync) FullCycle() error { WaitForDaemonStart: for { select { - case <-s.stop.Ch(): + case <-s.grp.Ch(): return nil default: _, err := s.daemon.WalletBalance() @@ -211,7 +211,7 @@ func (s *Sync) startWorker(workerNum int) { for { select { - case <-s.stop.Ch(): + case <-s.grp.Ch(): log.Printf("Stopping worker %d", workerNum) return default: @@ -222,7 +222,7 @@ func (s *Sync) startWorker(workerNum int) { if !more { return } - case <-s.stop.Ch(): + case <-s.grp.Ch(): log.Printf("Stopping worker %d", workerNum) return } @@ -237,7 +237,7 @@ func (s *Sync) startWorker(workerNum int) { if err != nil { log.Errorln("error processing video: " + err.Error()) if s.StopOnError { - s.stop.Stop() + s.grp.Stop() } else if s.MaxTries > 1 { if strings.Contains(err.Error(), "non 200 status code received") || strings.Contains(err.Error(), " reason: 'This video contains content from") || @@ -333,14 +333,14 @@ func (s *Sync) enqueueYoutubeVideos() error { Enqueue: for _, v := range videos { select { - case <-s.stop.Ch(): + case <-s.grp.Ch(): break Enqueue default: } select { case s.queue <- v: - case <-s.stop.Ch(): + case <-s.grp.Ch(): break Enqueue } } @@ -382,14 +382,14 @@ func (s *Sync) enqueueUCBVideos() error { Enqueue: for _, v := range videos { select { - case <-s.stop.Ch(): + case <-s.grp.Ch(): break Enqueue default: } select { case s.queue <- v: - case <-s.stop.Ch(): + case <-s.grp.Ch(): break Enqueue } }