minor refactor

This commit is contained in:
Alex Grintsvayg 2017-10-11 13:13:47 -04:00
parent 557c1ca8dc
commit f9f3ff1083

View file

@ -43,7 +43,7 @@ type Sync struct {
redisPool *redis.Pool redisPool *redis.Pool
} }
func (s *Sync) Go() error { func (s *Sync) init() error {
var err error var err error
s.redisPool = &redis.Pool{ s.redisPool = &redis.Pool{
@ -77,10 +77,28 @@ func (s *Sync) Go() error {
return errors.New("found blank claim address") return errors.New("found blank claim address")
} }
if s.LbryChannelName != "" {
err = s.ensureChannelOwnership()
if err != nil {
return err
}
}
return nil
}
func (s *Sync) Go() error {
var err error
err = s.init()
if err != nil {
return err
}
var wg sync.WaitGroup var wg sync.WaitGroup
videoQueue := make(chan video) videoQueue := make(chan video)
stopEnqueuing := make(chan struct{}) queueStopChan := make(chan struct{})
sendStopEnqueuing := sync.Once{} sendStopEnqueuing := sync.Once{}
var videoErrored atomic.Value var videoErrored atomic.Value
@ -89,13 +107,6 @@ func (s *Sync) Go() error {
log.Println("Will stop publishing if an error is detected") log.Println("Will stop publishing if an error is detected")
} }
if s.LbryChannelName != "" {
err = s.ensureChannelOwnership()
if err != nil {
return err
}
}
for i := 0; i < s.ConcurrentVideos; i++ { for i := 0; i < s.ConcurrentVideos; i++ {
go func() { go func() {
wg.Add(1) wg.Add(1)
@ -111,6 +122,8 @@ func (s *Sync) Go() error {
return return
} }
log.Println("========================================")
tryCount := 0 tryCount := 0
for { for {
tryCount++ tryCount++
@ -121,7 +134,7 @@ func (s *Sync) Go() error {
if s.StopOnError { if s.StopOnError {
videoErrored.Store(true) videoErrored.Store(true)
sendStopEnqueuing.Do(func() { sendStopEnqueuing.Do(func() {
stopEnqueuing <- struct{}{} queueStopChan <- struct{}{}
}) })
} else if s.MaxTries > 1 { } else if s.MaxTries > 1 {
if strings.Contains(err.Error(), "non 200 status code received") || if strings.Contains(err.Error(), "non 200 status code received") ||
@ -141,7 +154,7 @@ func (s *Sync) Go() error {
}() }()
} }
err = s.enqueueVideosFromChannel(s.YoutubeChannelID, &videoQueue, &stopEnqueuing) err = s.enqueueVideosFromChannel(s.YoutubeChannelID, &videoQueue, &queueStopChan)
close(videoQueue) close(videoQueue)
wg.Wait() wg.Wait()
return err return err
@ -185,7 +198,7 @@ func (s *Sync) ensureChannelOwnership() error {
return nil return nil
} }
func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, stopEnqueuing *chan struct{}) error { func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, queueStopChan *chan struct{}) error {
client := &http.Client{ client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
} }
@ -263,11 +276,12 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video,
sort.Sort(byPublishedAt(videos)) sort.Sort(byPublishedAt(videos))
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos))) //or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
Enqueue:
for _, v := range videos { for _, v := range videos {
select { select {
case *videoChan <- v: case *videoChan <- v:
case <-*stopEnqueuing: case <-*queueStopChan:
return nil break Enqueue
} }
} }
@ -275,7 +289,6 @@ func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video,
} }
func (s *Sync) processVideo(v video) error { func (s *Sync) processVideo(v video) error {
log.Println("========================================")
log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)") log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)")
conn := s.redisPool.Get() conn := s.redisPool.Get()