From b3f7f93f5fa5a836a9b779ba5d128fae02ce172e Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 31 Jul 2018 13:42:20 -0400 Subject: [PATCH] refactor and fix api response handlers reinstate redis db to detect inconsistencies --- ytsync/manager.go | 38 +++++++++++++++++++------------------- ytsync/ytsync.go | 28 +++++++++++++++++++++------- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index fd30074..24017fa 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -90,19 +90,19 @@ func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { return response.Data, nil } -type apiSyncUpdateResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data null.String `json:"data"` +type apiChannelStatusResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data []syncedVideo `json:"data"` } type syncedVideo struct { - VideoID string - Published bool - FailureReason string + VideoID string `json:"video_id"` + Published bool `json:"published"` + FailureReason string `json:"failure_reason"` } -func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[string]syncedVideo, error) { +func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) { endpoint := s.ApiURL + "/yt/channel_status" res, _ := http.PostForm(endpoint, url.Values{ @@ -113,7 +113,7 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[ }) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) - var response apiSyncUpdateResponse + var response apiChannelStatusResponse err := json.Unmarshal(body, &response) if err != nil { return nil, err @@ -121,18 +121,12 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[ if !response.Error.IsNull() { return nil, errors.Err(response.Error.String) } - if !response.Data.IsNull() { - if response.Data.String == "ok" { + if response.Data != nil { + if len(response.Data) == 0 { return nil, nil } - var sv []syncedVideo - err := json.Unmarshal([]byte(response.Data.String), &sv) - if err != nil { - return nil, errors.Err("could not parse synced videos") - } - svs := make(map[string]syncedVideo) - for _, v := range sv { + for _, v := range response.Data { svs[v.VideoID] = v } return svs, nil @@ -145,6 +139,12 @@ const ( VideoStatusFailed = "failed" ) +type apiVideoStatusResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data null.String `json:"data"` +} + func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { endpoint := s.ApiURL + "/yt/video_status" @@ -168,7 +168,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st res, _ := http.PostForm(endpoint, vals) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) - var response apiSyncUpdateResponse + var response apiVideoStatusResponse err := json.Unmarshal(body, &response) if err != nil { return err diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 362d115..b1a3ce2 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -21,6 +21,7 @@ import ( "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/util" + "github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" @@ -63,9 +64,9 @@ type Sync struct { daemon *jsonrpc.Client claimAddress string videoDirectory string - //db *redisdb.DB - syncedVideos map[string]syncedVideo - grp *stop.Group + db *redisdb.DB + syncedVideos map[string]syncedVideo + grp *stop.Group mux sync.Mutex wg sync.WaitGroup @@ -109,10 +110,11 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - syncedVideos, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) + syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) if err != nil { return err } + defer func() { if e != nil { //conditions for which a channel shouldn't be marked as failed @@ -123,14 +125,14 @@ func (s *Sync) FullCycle() (e error) { if util.SubstringInSlice(e.Error(), noFailConditions) { return } - _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) e = errors.Prefix(err.Error(), e) } } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) if err != nil { e = err } @@ -181,7 +183,7 @@ func (s *Sync) FullCycle() (e error) { return errors.Wrap(err, 0) } - //s.db = redisdb.New() + s.db = redisdb.New() s.syncedVideos = syncedVideos s.grp = stop.New() s.queue = make(chan video) @@ -502,6 +504,18 @@ func (s *Sync) processVideo(v video) (err error) { return nil } + alreadyPublishedOld, err := s.db.IsPublished(v.ID()) + if err != nil { + return err + } + //TODO: remove this after a few runs + if alreadyPublishedOld && !alreadyPublished { + //seems like something in the migration of blobs didn't go perfectly right so warn about it! + SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s", + s.YoutubeChannelID, v.ID()) + return nil + } + if alreadyPublished { log.Println(v.ID() + " already published") return nil