use stop group instead of wait group

refactoring
address reviews
This commit is contained in:
Niko Storni 2018-08-14 10:48:55 -04:00
parent 3e32372dbe
commit e08e1bd3a4
3 changed files with 50 additions and 51 deletions

View file

@ -136,12 +136,6 @@ const (
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
) )
type apiVideoStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error {
endpoint := s.ApiURL + "/yt/video_status" endpoint := s.ApiURL + "/yt/video_status"
@ -165,7 +159,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
res, _ := http.PostForm(endpoint, vals) res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
var response apiVideoStatusResponse var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response) err := json.Unmarshal(body, &response)
if err != nil { if err != nil {
return err return err

View file

@ -14,8 +14,8 @@ import (
func (s *Sync) walletSetup() error { func (s *Sync) walletSetup() error {
//prevent unnecessary concurrent execution //prevent unnecessary concurrent execution
s.mux.Lock() s.walletMux.Lock()
defer s.mux.Unlock() defer s.walletMux.Unlock()
err := s.ensureChannelOwnership() err := s.ensureChannelOwnership()
if err != nil { if err != nil {
return err return err
@ -30,27 +30,28 @@ func (s *Sync) walletSetup() error {
balance := decimal.Decimal(*balanceResp) balance := decimal.Decimal(*balanceResp)
log.Debugf("Starting balance is %s", balance.String()) log.Debugf("Starting balance is %s", balance.String())
var numOnSource uint64 var numOnSource int
if s.LbryChannelName == "@UCBerkeley" { if s.LbryChannelName == "@UCBerkeley" {
numOnSource = 10104 numOnSource = 10104
} else { } else {
numOnSource, err = s.CountVideos() n, err := s.CountVideos()
if err != nil { if err != nil {
return err return err
} }
numOnSource = int(n)
} }
log.Debugf("Source channel has %d videos", numOnSource) log.Debugf("Source channel has %d videos", numOnSource)
if numOnSource == 0 { if numOnSource == 0 {
return nil return nil
} }
s.videosMapMux.Lock() s.syncedVideosMux.Lock()
numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.videosMapMux.Unlock() s.syncedVideosMux.Unlock()
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already published %d videos", numPublished)
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { if numOnSource-numPublished > s.Manager.VideosLimit {
numOnSource = uint64(s.Manager.VideosLimit) numOnSource = s.Manager.VideosLimit
} }
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount

View file

@ -66,18 +66,17 @@ type Sync struct {
videoDirectory string videoDirectory string
db *redisdb.DB db *redisdb.DB
syncedVideos map[string]syncedVideo syncedVideos map[string]syncedVideo
syncedVideosMux *sync.Mutex
grp *stop.Group grp *stop.Group
lbryChannelID string lbryChannelID string
videosMapMux sync.Mutex walletMux *sync.Mutex
mux sync.Mutex
wg sync.WaitGroup
queue chan video queue chan video
} }
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
s.videosMapMux.Lock() s.syncedVideosMux.Lock()
defer s.videosMapMux.Unlock() defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{ s.syncedVideos[videoID] = syncedVideo{
VideoID: videoID, VideoID: videoID,
Published: published, Published: published,
@ -122,10 +121,25 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" { if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided") return errors.Err("channel ID not provided")
} }
s.syncedVideosMux = &sync.Mutex{}
s.walletMux = &sync.Mutex{}
s.db = redisdb.New()
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
if err != nil { if err != nil {
return err return err
} }
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.syncedVideosMux.Unlock()
defer func() { defer func() {
if e != nil { if e != nil {
@ -194,21 +208,6 @@ func (s *Sync) FullCycle() (e error) {
return errors.Wrap(err, 0) return errors.Wrap(err, 0)
} }
s.db = redisdb.New()
s.videosMapMux.Lock()
s.syncedVideos = syncedVideos
s.videosMapMux.Unlock()
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
log.Printf("Starting daemon") log.Printf("Starting daemon")
err = startDaemonViaSystemd() err = startDaemonViaSystemd()
if err != nil { if err != nil {
@ -263,7 +262,11 @@ func (s *Sync) doSync() error {
} }
for i := 0; i < s.ConcurrentVideos; i++ { for i := 0; i < s.ConcurrentVideos; i++ {
go s.startWorker(i) s.grp.Add(1)
go func() {
defer s.grp.Done()
s.startWorker(i)
}()
} }
if s.LbryChannelName == "@UCBerkeley" { if s.LbryChannelName == "@UCBerkeley" {
@ -272,14 +275,11 @@ func (s *Sync) doSync() error {
err = s.enqueueYoutubeVideos() err = s.enqueueYoutubeVideos()
} }
close(s.queue) close(s.queue)
s.wg.Wait() s.grp.Wait()
return err return err
} }
func (s *Sync) startWorker(workerNum int) { func (s *Sync) startWorker(workerNum int) {
s.wg.Add(1)
defer s.wg.Done()
var v video var v video
var more bool var more bool
@ -510,9 +510,9 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String()) log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now()) }(time.Now())
s.videosMapMux.Lock() s.syncedVideosMux.Lock()
sv, ok := s.syncedVideos[v.ID()] sv, ok := s.syncedVideos[v.ID()]
s.videosMapMux.Unlock() s.syncedVideosMux.Unlock()
alreadyPublished := ok && sv.Published alreadyPublished := ok && sv.Published
neverRetryFailures := []string{ neverRetryFailures := []string{