remove redisDB dependency #25
3 changed files with 23 additions and 7 deletions
|
@ -270,7 +270,7 @@ func (s SyncManager) Start() error {
|
|||
}
|
||||
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
|
||||
}
|
||||
SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount)
|
||||
SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i+1, len(syncs), syncCount)
|
||||
syncCount++
|
||||
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
|
||||
shouldInterruptLoop = true
|
||||
|
|
|
@ -44,7 +44,9 @@ func (s *Sync) walletSetup() error {
|
|||
return nil
|
||||
}
|
||||
![]() Grins previous review outlined a mess with casts here and there to make simple math. Grins previous review outlined a mess with casts here and there to make simple math.
I changed everything to int as it's reasonable for the values they represent.
|
||||
|
||||
s.videosMapMux.Lock()
|
||||
![]() what is this?! not part of the PR but this is not good to have in the code base. what is this?! not part of the PR but this is not good to have in the code base.
![]() I can explain the berkeley stuff to you via DM, it's all good as it will be eventually removed from here. Not worth changing now I can explain the berkeley stuff to you via DM, it's all good as it will be eventually removed from here. Not worth changing now
|
||||
numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones...
|
||||
s.videosMapMux.Unlock()
|
||||
log.Debugf("We already published %d videos", numPublished)
|
||||
|
||||
if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {
|
||||
|
|
|
@ -68,9 +68,20 @@ type Sync struct {
|
|||
syncedVideos map[string]syncedVideo
|
||||
grp *stop.Group
|
||||
|
||||
mux sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
queue chan video
|
||||
videosMapMux sync.Mutex
|
||||
mux sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
queue chan video
|
||||
}
|
||||
|
||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) {
|
||||
s.videosMapMux.Lock()
|
||||
defer s.videosMapMux.Unlock()
|
||||
s.syncedVideos[videoID] = syncedVideo{
|
||||
VideoID: videoID,
|
||||
Published: published,
|
||||
FailureReason: failureReason,
|
||||
}
|
||||
}
|
||||
|
||||
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
||||
|
@ -184,7 +195,9 @@ func (s *Sync) FullCycle() (e error) {
|
|||
}
|
||||
|
||||
s.db = redisdb.New()
|
||||
s.videosMapMux.Lock()
|
||||
s.syncedVideos = syncedVideos
|
||||
s.videosMapMux.Unlock()
|
||||
s.grp = stop.New()
|
||||
s.queue = make(chan video)
|
||||
|
||||
|
@ -343,6 +356,7 @@ func (s *Sync) startWorker(workerNum int) {
|
|||
if err != nil {
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
s.AppendSyncedVideo(v.ID(), false, err.Error())
|
||||
}
|
||||
break
|
||||
}
|
||||
|
@ -496,7 +510,9 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
log.Println(v.ID() + " took " + time.Since(start).String())
|
||||
}(time.Now())
|
||||
|
||||
s.videosMapMux.Lock()
|
||||
sv, ok := s.syncedVideos[v.ID()]
|
||||
s.videosMapMux.Unlock()
|
||||
alreadyPublished := ok && sv.Published
|
||||
|
||||
neverRetryFailures := []string{
|
||||
|
@ -535,12 +551,10 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
return err
|
||||
}
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
|
||||
if err != nil {
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.AppendSyncedVideo(v.ID(), true, "")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue
why change this to an int? It better to be more specific than more general.