refactor and fix api response handlers

reinstate redis db to detect inconsistencies
This commit is contained in:
Niko Storni 2018-07-31 13:42:20 -04:00
parent dfb55cecb5
commit 9845ac1181
2 changed files with 40 additions and 26 deletions

View file

@ -90,19 +90,19 @@ func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
return response.Data, nil return response.Data, nil
} }
type apiSyncUpdateResponse struct { type apiChannelStatusResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
Error null.String `json:"error"` Error null.String `json:"error"`
Data null.String `json:"data"` Data []syncedVideo `json:"data"`
} }
type syncedVideo struct { type syncedVideo struct {
VideoID string VideoID string `json:"video_id"`
Published bool Published bool `json:"published"`
FailureReason string FailureReason string `json:"failure_reason"`
} }
func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[string]syncedVideo, error) { func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) {
endpoint := s.ApiURL + "/yt/channel_status" endpoint := s.ApiURL + "/yt/channel_status"
res, _ := http.PostForm(endpoint, url.Values{ res, _ := http.PostForm(endpoint, url.Values{
@ -113,7 +113,7 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[
}) })
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse var response apiChannelStatusResponse
err := json.Unmarshal(body, &response) err := json.Unmarshal(body, &response)
if err != nil { if err != nil {
return nil, err return nil, err
@ -121,18 +121,12 @@ func (s SyncManager) setChannelSyncStatus(channelID string, status string) (map[
if !response.Error.IsNull() { if !response.Error.IsNull() {
return nil, errors.Err(response.Error.String) return nil, errors.Err(response.Error.String)
} }
if !response.Data.IsNull() { if response.Data != nil {
if response.Data.String == "ok" { if len(response.Data) == 0 {
return nil, nil return nil, nil
} }
var sv []syncedVideo
err := json.Unmarshal([]byte(response.Data.String), &sv)
if err != nil {
return nil, errors.Err("could not parse synced videos")
}
svs := make(map[string]syncedVideo) svs := make(map[string]syncedVideo)
for _, v := range sv { for _, v := range response.Data {
svs[v.VideoID] = v svs[v.VideoID] = v
} }
return svs, nil return svs, nil
@ -145,6 +139,12 @@ const (
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
) )
type apiVideoStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error {
endpoint := s.ApiURL + "/yt/video_status" endpoint := s.ApiURL + "/yt/video_status"
@ -168,7 +168,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st
res, _ := http.PostForm(endpoint, vals) res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse var response apiVideoStatusResponse
err := json.Unmarshal(body, &response) err := json.Unmarshal(body, &response)
if err != nil { if err != nil {
return err return err

View file

@ -21,6 +21,7 @@ import (
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources" "github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps" "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -63,9 +64,9 @@ type Sync struct {
daemon *jsonrpc.Client daemon *jsonrpc.Client
claimAddress string claimAddress string
videoDirectory string videoDirectory string
//db *redisdb.DB db *redisdb.DB
syncedVideos map[string]syncedVideo syncedVideos map[string]syncedVideo
grp *stop.Group grp *stop.Group
mux sync.Mutex mux sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
@ -109,10 +110,11 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" { if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided") return errors.Err("channel ID not provided")
} }
syncedVideos, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing) syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing)
if err != nil { if err != nil {
return err return err
} }
defer func() { defer func() {
if e != nil { if e != nil {
//conditions for which a channel shouldn't be marked as failed //conditions for which a channel shouldn't be marked as failed
@ -123,14 +125,14 @@ func (s *Sync) FullCycle() (e error) {
if util.SubstringInSlice(e.Error(), noFailConditions) { if util.SubstringInSlice(e.Error(), noFailConditions) {
return return
} }
_, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed) _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err) err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e) e = errors.Prefix(err.Error(), e)
} }
} else if !s.IsInterrupted() { } else if !s.IsInterrupted() {
_, err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced) _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced)
if err != nil { if err != nil {
e = err e = err
} }
@ -181,7 +183,7 @@ func (s *Sync) FullCycle() (e error) {
return errors.Wrap(err, 0) return errors.Wrap(err, 0)
} }
//s.db = redisdb.New() s.db = redisdb.New()
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.grp = stop.New() s.grp = stop.New()
s.queue = make(chan video) s.queue = make(chan video)
@ -502,6 +504,18 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
alreadyPublishedOld, err := s.db.IsPublished(v.ID())
if err != nil {
return err
}
//TODO: remove this after a few runs
if alreadyPublishedOld && !alreadyPublished {
//seems like something in the migration of blobs didn't go perfectly right so warn about it!
SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s",
s.YoutubeChannelID, v.ID())
return nil
}
if alreadyPublished { if alreadyPublished {
log.Println(v.ID() + " already published") log.Println(v.ID() + " already published")
return nil return nil