diff --git a/manager.go b/manager.go index 4d5e6cf..fd30074 100644 --- a/manager.go +++ b/manager.go @@ -96,7 +96,13 @@ type apiSyncUpdateResponse struct { Data null.String `json:"data"` } -func (s SyncManager) setChannelSyncStatus(channelID string, status string) error { +type syncedVideo struct { + VideoID string + Published bool + FailureReason string +} + +func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[string]syncedVideo, error) { endpoint := s.ApiURL + "/yt/channel_status" res, _ := http.PostForm(endpoint, url.Values{ @@ -110,15 +116,28 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) error var response apiSyncUpdateResponse err := json.Unmarshal(body, &response) if err != nil { - return err + return nil, err } if !response.Error.IsNull() { - return errors.Err(response.Error.String) + return nil, errors.Err(response.Error.String) } - if !response.Data.IsNull() && response.Data.String == "ok" { - return nil + if !response.Data.IsNull() { + if response.Data.String == "ok" { + return nil, nil + } + var sv []syncedVideo + err := json.Unmarshal([]byte(response.Data.String), &sv) + if err != nil { + return nil, errors.Err("could not parse synced videos") + } + + svs := make(map[string]syncedVideo) + for _, v := range sv { + svs[v.VideoID] = v + } + return svs, nil } - return errors.Err("invalid API response. Status code: %d", res.StatusCode) + return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) } const ( diff --git a/setup.go b/setup.go index 0046d8a..f8cc8fe 100644 --- a/setup.go +++ b/setup.go @@ -44,7 +44,8 @@ func (s *Sync) walletSetup() error { return nil } - numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) + //numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) + numPublished := uint64(len(s.syncedVideos)) //should we only count published videos? Credits are allocated even for failed ones... if err != nil { return err } @@ -53,6 +54,11 @@ func (s *Sync) walletSetup() error { if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) { numOnSource = uint64(s.Manager.VideosLimit) } + + //TODO: get rid of this as soon as we compute this condition using the database in a more reliable way + if numPublished >= numOnSource { + return errors.Err("channel is already up to date") + } minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount if numPublished > numOnSource { SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource) diff --git a/ytsync.go b/ytsync.go index 0ee0056..362d115 100644 --- a/ytsync.go +++ b/ytsync.go @@ -21,7 +21,6 @@ import ( "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" @@ -64,9 +63,9 @@ type Sync struct { daemon *jsonrpc.Client claimAddress string videoDirectory string - db *redisdb.DB - - grp *stop.Group + //db *redisdb.DB + syncedVideos map[string]syncedVideo + grp *stop.Group mux sync.Mutex wg sync.WaitGroup @@ -110,7 +109,7 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) + syncedVideos, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) if err != nil { return err } @@ -119,18 +118,19 @@ func (s *Sync) FullCycle() (e error) { //conditions for which a channel shouldn't be marked as failed noFailConditions := []string{ "this youtube channel is being managed by another server", + "channel is already up to date", } if util.SubstringInSlice(e.Error(), noFailConditions) { return } - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) + _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) e = errors.Prefix(err.Error(), e) } } else if !s.IsInterrupted() { - err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) + _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) if err != nil { e = err } @@ -181,7 +181,8 @@ func (s *Sync) FullCycle() (e error) { return errors.Wrap(err, 0) } - s.db = redisdb.New() + //s.db = redisdb.New() + s.syncedVideos = syncedVideos s.grp = stop.New() s.queue = make(chan video) @@ -493,9 +494,12 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - alreadyPublished, err := s.db.IsPublished(v.ID()) - if err != nil { - return err + sv, ok := s.syncedVideos[v.ID()] + alreadyPublished := ok && sv.Published + + if ok && !sv.Published && strings.Contains(sv.FailureReason, "Error extracting sts from embedded url response") { + log.Println(v.ID() + " can't be published") + return nil } if alreadyPublished { @@ -515,7 +519,7 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } - err = s.db.SetPublished(v.ID()) + //err = s.db.SetPublished(v.ID()) if err != nil { return err }