From 3956e0dc94c5f8e56ce8fe2f9aaf03a69e312848 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 3 Aug 2018 13:21:42 -0400 Subject: [PATCH] update synced videos map on progress add locking fix bugs --- ytsync/manager.go | 2 +- ytsync/setup.go | 2 ++ ytsync/ytsync.go | 26 ++++++++++++++++++++------ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 24017fa..35381b8 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -270,7 +270,7 @@ func (s SyncManager) Start() error { } SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) } - SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount) + SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i+1, len(syncs), syncCount) syncCount++ if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { shouldInterruptLoop = true diff --git a/ytsync/setup.go b/ytsync/setup.go index 042895d..f8c6441 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -44,7 +44,9 @@ func (s *Sync) walletSetup() error { return nil } + s.videosMapMux.Lock() numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... + s.videosMapMux.Unlock() log.Debugf("We already published %d videos", numPublished) if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 969a658..5deb71e 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -68,9 +68,20 @@ type Sync struct { syncedVideos map[string]syncedVideo grp *stop.Group - mux sync.Mutex - wg sync.WaitGroup - queue chan video + videosMapMux sync.Mutex + mux sync.Mutex + wg sync.WaitGroup + queue chan video +} + +func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { + s.videosMapMux.Lock() + defer s.videosMapMux.Unlock() + s.syncedVideos[videoID] = syncedVideo{ + VideoID: videoID, + Published: published, + FailureReason: failureReason, + } } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -184,7 +195,9 @@ func (s *Sync) FullCycle() (e error) { } s.db = redisdb.New() + s.videosMapMux.Lock() s.syncedVideos = syncedVideos + s.videosMapMux.Unlock() s.grp = stop.New() s.queue = make(chan video) @@ -343,6 +356,7 @@ func (s *Sync) startWorker(workerNum int) { if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } + s.AppendSyncedVideo(v.ID(), false, err.Error()) } break } @@ -496,7 +510,9 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) + s.videosMapMux.Lock() sv, ok := s.syncedVideos[v.ID()] + s.videosMapMux.Unlock() alreadyPublished := ok && sv.Published neverRetryFailures := []string{ @@ -535,12 +551,10 @@ func (s *Sync) processVideo(v video) (err error) { return err } err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") - if err != nil { - SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) - } if err != nil { return err } + s.AppendSyncedVideo(v.ID(), true, "") return nil }