update synced videos map on progress

add locking
fix bugs
This commit is contained in:
Niko Storni 2018-08-03 13:21:42 -04:00
parent e15476b4f8
commit 24aca302ff
3 changed files with 23 additions and 7 deletions

View file

@ -270,7 +270,7 @@ func (s SyncManager) Start() error {
} }
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
} }
SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount) SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i+1, len(syncs), syncCount)
syncCount++ syncCount++
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true shouldInterruptLoop = true

View file

@ -44,7 +44,9 @@ func (s *Sync) walletSetup() error {
return nil return nil
} }
s.videosMapMux.Lock()
numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones...
s.videosMapMux.Unlock()
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already published %d videos", numPublished)
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {

View file

@ -68,9 +68,20 @@ type Sync struct {
syncedVideos map[string]syncedVideo syncedVideos map[string]syncedVideo
grp *stop.Group grp *stop.Group
mux sync.Mutex videosMapMux sync.Mutex
wg sync.WaitGroup mux sync.Mutex
queue chan video wg sync.WaitGroup
queue chan video
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
s.videosMapMux.Lock()
defer s.videosMapMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{
VideoID: videoID,
Published: published,
FailureReason: failureReason,
}
} }
// SendErrorToSlack Sends an error message to the default channel and to the process log. // SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -184,7 +195,9 @@ func (s *Sync) FullCycle() (e error) {
} }
s.db = redisdb.New() s.db = redisdb.New()
s.videosMapMux.Lock()
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.videosMapMux.Unlock()
s.grp = stop.New() s.grp = stop.New()
s.queue = make(chan video) s.queue = make(chan video)
@ -343,6 +356,7 @@ func (s *Sync) startWorker(workerNum int) {
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
s.AppendSyncedVideo(v.ID(), false, err.Error())
} }
break break
} }
@ -496,7 +510,9 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String()) log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now()) }(time.Now())
s.videosMapMux.Lock()
sv, ok := s.syncedVideos[v.ID()] sv, ok := s.syncedVideos[v.ID()]
s.videosMapMux.Unlock()
alreadyPublished := ok && sv.Published alreadyPublished := ok && sv.Published
neverRetryFailures := []string{ neverRetryFailures := []string{
@ -535,12 +551,10 @@ func (s *Sync) processVideo(v video) (err error) {
return err return err
} }
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
if err != nil { if err != nil {
return err return err
} }
s.AppendSyncedVideo(v.ID(), true, "")
return nil return nil
} }