diff --git a/ytsync.go b/ytsync.go index 3c37d74..999028a 100644 --- a/ytsync.go +++ b/ytsync.go @@ -43,7 +43,7 @@ type Sync struct { redisPool *redis.Pool } -func (s *Sync) Go() error { +func (s *Sync) init() error { var err error s.redisPool = &redis.Pool{ @@ -77,10 +77,28 @@ func (s *Sync) Go() error { return errors.New("found blank claim address") } + if s.LbryChannelName != "" { + err = s.ensureChannelOwnership() + if err != nil { + return err + } + } + + return nil +} + +func (s *Sync) Go() error { + var err error + + err = s.init() + if err != nil { + return err + } + var wg sync.WaitGroup videoQueue := make(chan video) - stopEnqueuing := make(chan struct{}) + queueStopChan := make(chan struct{}) sendStopEnqueuing := sync.Once{} var videoErrored atomic.Value @@ -89,13 +107,6 @@ func (s *Sync) Go() error { log.Println("Will stop publishing if an error is detected") } - if s.LbryChannelName != "" { - err = s.ensureChannelOwnership() - if err != nil { - return err - } - } - for i := 0; i < s.ConcurrentVideos; i++ { go func() { wg.Add(1) @@ -111,6 +122,8 @@ func (s *Sync) Go() error { return } + log.Println("========================================") + tryCount := 0 for { tryCount++ @@ -121,7 +134,7 @@ func (s *Sync) Go() error { if s.StopOnError { videoErrored.Store(true) sendStopEnqueuing.Do(func() { - stopEnqueuing <- struct{}{} + queueStopChan <- struct{}{} }) } else if s.MaxTries > 1 { if strings.Contains(err.Error(), "non 200 status code received") || @@ -141,7 +154,7 @@ func (s *Sync) Go() error { }() } - err = s.enqueueVideosFromChannel(s.YoutubeChannelID, &videoQueue, &stopEnqueuing) + err = s.enqueueVideosFromChannel(s.YoutubeChannelID, &videoQueue, &queueStopChan) close(videoQueue) wg.Wait() return err @@ -185,7 +198,7 @@ func (s *Sync) ensureChannelOwnership() error { return nil } -func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, stopEnqueuing *chan struct{}) error { +func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, queueStopChan *chan struct{}) error { client := &http.Client{ Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, } @@ -263,11 +276,12 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, sort.Sort(byPublishedAt(videos)) //or sort.Sort(sort.Reverse(byPlaylistPosition(videos))) +Enqueue: for _, v := range videos { select { case *videoChan <- v: - case <-*stopEnqueuing: - return nil + case <-*queueStopChan: + break Enqueue } } @@ -275,7 +289,6 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, } func (s *Sync) processVideo(v video) error { - log.Println("========================================") log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)") conn := s.redisPool.Get()