From c0be626ef2892c7beee6234321bd5bad805edf56 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 27 Apr 2018 09:14:23 -0400 Subject: [PATCH] add selfsync cmd add tests add more slack updates remove unneded test extract constants to own file --- cmd/constants.go | 20 +++++ cmd/selfsync.go | 195 +++++++++++++++++++++++++++++++++++++++++++ cmd/selfsync_test.go | 31 +++++++ cmd/ytsync.go | 39 ++------- cmd/ytsync_test.go | 9 -- 5 files changed, 255 insertions(+), 39 deletions(-) create mode 100644 cmd/constants.go create mode 100644 cmd/selfsync.go create mode 100644 cmd/selfsync_test.go delete mode 100644 cmd/ytsync_test.go diff --git a/cmd/constants.go b/cmd/constants.go new file mode 100644 index 0000000..a1fdef5 --- /dev/null +++ b/cmd/constants.go @@ -0,0 +1,20 @@ +package cmd + +const defaultMaxTries = 3 + +var ( + stopOnError bool + maxTries int + takeOverExistingChannel bool + refill int + limit int +) + +const ( + StatusPending = "pending" // waiting for permission to sync + StatusQueued = "queued" // in sync queue. will be synced soon + StatusSyncing = "syncing" // syncing now + StatusSynced = "synced" // done +) + +var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced} diff --git a/cmd/selfsync.go b/cmd/selfsync.go new file mode 100644 index 0000000..e2ea9db --- /dev/null +++ b/cmd/selfsync.go @@ -0,0 +1,195 @@ +package cmd + +import ( + "github.com/lbryio/lbry.go/errors" + sync "github.com/lbryio/lbry.go/ytsync" + + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "strings" + + "github.com/lbryio/lbry.go/null" + "github.com/lbryio/lbry.go/util" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + var selfSyncCmd = &cobra.Command{ + Use: "selfsync ", + Args: cobra.RangeArgs(2, 2), + Short: "Publish youtube channels into LBRY network automatically.", + Run: selfSync, + } + selfSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit") + selfSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") + selfSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") + selfSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") + RootCmd.AddCommand(selfSyncCmd) +} + +type APIJobsResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data []APIYoutubeChannel `json:"data"` +} + +type APIYoutubeChannel struct { + ChannelId string `json:"channel_id"` + TotalVideos uint `json:"total_videos"` + DesiredChannelName string `json:"desired_channel_name"` + SyncServer null.String `json:"sync_server"` +} + +//PoC +func fetchChannels(authToken string) ([]APIYoutubeChannel, error) { + url := "http://localhost:8080/yt/jobs" + payload := strings.NewReader("------WebKitFormBoundary7MA4YWxkTrZu0gW\r\nContent-Disposition: form-data; name=\"auth_token\"\r\n\r\n" + authToken + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW--") + req, _ := http.NewRequest("POST", url, payload) + req.Header.Add("content-type", "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW") + res, _ := http.DefaultClient.Do(req) + defer res.Body.Close() + body, _ := ioutil.ReadAll(res.Body) + //fmt.Println(res) + //fmt.Println(string(body)) + var response APIJobsResponse + err := json.Unmarshal(body, &response) + if err != nil { + return nil, err + } + return response.Data, nil +} + +type APISyncUpdateResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data null.String `json:"data"` +} + +func setChannelSyncStatus(authToken string, channelID string, status string) error { + host, err := os.Hostname() + if err != nil { + return errors.Err("could not detect system hostname") + } + url := "http://localhost:8080/yt/sync_update" + payload := strings.NewReader("------WebKitFormBoundary7MA4YWxkTrZu0gW\r\nContent-Disposition: form-data;" + + " name=\"channel_id\"\r\n\r\n" + channelID + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + + "Content-Disposition: form-data; name=\"sync_server\"\r\n\r\n" + host + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + + "Content-Disposition: form-data; name=\"auth_token\"\r\n\r\n" + authToken + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" + + "Content-Disposition: form-data; name=\"sync_status\"\r\n\r\n" + status + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW--") + req, _ := http.NewRequest("POST", url, payload) + req.Header.Add("content-type", "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW") + req.Header.Add("Cache-Control", "no-cache") + res, _ := http.DefaultClient.Do(req) + defer res.Body.Close() + body, _ := ioutil.ReadAll(res.Body) + //fmt.Println(res) + //fmt.Println(string(body)) + var response APISyncUpdateResponse + err = json.Unmarshal(body, &response) + if err != nil { + return err + } + if !response.Error.IsNull() { + return errors.Err(response.Error.String) + } + if !response.Data.IsNull() && response.Data.String == "ok" { + return nil + } + return errors.Err("invalid API response") +} + +func selfSync(cmd *cobra.Command, args []string) { + slackToken := os.Getenv("SLACK_TOKEN") + if slackToken == "" { + log.Error("A slack token was not present in env vars! Slack messages disabled!") + } else { + util.InitSlack(os.Getenv("SLACK_TOKEN")) + } + + ytAPIKey := args[0] + authToken := args[1] + + if stopOnError && maxTries != defaultMaxTries { + log.Errorln("--stop-on-error and --max-tries are mutually exclusive") + return + } + if maxTries < 1 { + log.Errorln("setting --max-tries less than 1 doesn't make sense") + return + } + + if limit < 0 { + log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense") + return + } + channelsToSync, err := fetchChannels(authToken) + if err != nil { + msg := fmt.Sprintf("failed to fetch channels: %v", err) + log.Errorln(msg) + util.SendToSlack(msg) + return + } + + for loops := 0; loops < len(channelsToSync); loops++ { + //avoid dereferencing + channel := channelsToSync[loops] + channelID := channel.ChannelId + lbryChannelName := channel.DesiredChannelName + if channel.TotalVideos < 1 { + msg := fmt.Sprintf("Channnel %s has no videos. Skipping", lbryChannelName) + util.SendToSlack(msg) + log.Debugln(msg) + continue + } + if !channel.SyncServer.IsNull() { + msg := fmt.Sprintf("Channnel %s is being synced by another server: %s", lbryChannelName, channel.SyncServer.String) + util.SendToSlack(msg) + log.Debugln(msg) + continue + } + + //acquire the lock on the channel + err := setChannelSyncStatus(authToken, channelID, StatusSyncing) + if err != nil { + msg := fmt.Sprintf("Failed aquiring sync rights for channel %s: %v", lbryChannelName, err) + util.SendToSlack(msg) + log.Debugln(msg) + continue + } + msg := fmt.Sprintf("Syncing %s to LBRY! (iteration %d)", lbryChannelName, loops) + util.SendToSlack(msg) + log.Debugln(msg) + + s := sync.Sync{ + YoutubeAPIKey: ytAPIKey, + YoutubeChannelID: channelID, + LbryChannelName: lbryChannelName, + StopOnError: stopOnError, + MaxTries: maxTries, + ConcurrentVideos: 1, + TakeOverExistingChannel: takeOverExistingChannel, + Refill: refill, + } + + err = s.FullCycle() + util.SendToSlack("Syncing " + lbryChannelName + " reached an end.") + if err != nil { + log.Error(errors.FullTrace(err)) + util.SendToSlack(errors.FullTrace(err)) + break + } + + if limit != 0 && loops >= limit { + msg := fmt.Sprintf("limit of %d reached! Stopping", limit) + util.SendToSlack(msg) + log.Debugln(msg) + break + } + } + util.SendToSlack("Syncing process terminated!") + log.Debugln("Syncing process terminated!") +} diff --git a/cmd/selfsync_test.go b/cmd/selfsync_test.go new file mode 100644 index 0000000..8099217 --- /dev/null +++ b/cmd/selfsync_test.go @@ -0,0 +1,31 @@ +package cmd + +import ( + "fmt" + "testing" +) + +func TestFetchChannels(t *testing.T) { + res, err := fetchChannels("620280") + if err != nil { + t.Error(err) + } + if res == nil { + t.Error("empty response") + } + fmt.Println(res) +} + +// warning this test will actually set sync_server on the db entry for this test channel (mine) +// such field should be reset to null if the test must be run on a different machine (different hostname) +// and obviously the auth token must be appropriate +func TestSetChannelSyncStatus(t *testing.T) { + err := setChannelSyncStatus("620280", "UCNQfQvFMPnInwsU_iGYArJQ", StatusSyncing) + if err != nil { + t.Error(err) + } + err = setChannelSyncStatus("620280", "UCNQfQvFMPnInwsU_iGYArJQ", StatusQueued) + if err != nil { + t.Error(err) + } +} diff --git a/cmd/ytsync.go b/cmd/ytsync.go index 2187bc9..a20bbfd 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -4,14 +4,11 @@ import ( "github.com/lbryio/lbry.go/errors" sync "github.com/lbryio/lbry.go/ytsync" - "fmt" + "os" + "github.com/lbryio/lbry.go/util" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "io/ioutil" - "net/http" - "os" - "strings" ) func init() { @@ -26,37 +23,15 @@ func init() { ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") ytSyncCmd.Flags().IntVar(&refill, "refill", 0, "Also add this many credits to the wallet") RootCmd.AddCommand(ytSyncCmd) +} + +func ytsync(cmd *cobra.Command, args []string) { slackToken := os.Getenv("SLACK_TOKEN") if slackToken == "" { log.Error("A slack token was not present in env vars! Slack messages disabled!") } else { util.InitSlack(os.Getenv("SLACK_TOKEN")) } -} - -const defaultMaxTries = 3 - -var ( - stopOnError bool - maxTries int - takeOverExistingChannel bool - refill int -) - -//PoC -func fetchChannels() { - url := "http://localhost:8080/yt/jobs" - payload := strings.NewReader("------WebKitFormBoundary7MA4YWxkTrZu0gW\r\nContent-Disposition: form-data; name=\"auth_token\"\r\n\r\n620280\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW--") - req, _ := http.NewRequest("POST", url, payload) - req.Header.Add("content-type", "multipart/form-data") - res, _ := http.DefaultClient.Do(req) - defer res.Body.Close() - body, _ := ioutil.ReadAll(res.Body) - fmt.Println(res) - fmt.Println(string(body)) -} - -func ytsync(cmd *cobra.Command, args []string) { ytAPIKey := args[0] lbryChannelName := args[1] if string(lbryChannelName[0]) != "@" { @@ -77,6 +52,7 @@ func ytsync(cmd *cobra.Command, args []string) { log.Errorln("setting --max-tries less than 1 doesn't make sense") return } + util.SendToSlack("Syncing " + lbryChannelName + " to LBRY!") s := sync.Sync{ YoutubeAPIKey: ytAPIKey, @@ -93,5 +69,8 @@ func ytsync(cmd *cobra.Command, args []string) { if err != nil { log.Error(errors.FullTrace(err)) + util.SendToSlack(errors.FullTrace(err)) + } + util.SendToSlack("Syncing " + lbryChannelName + " reached an end.") } diff --git a/cmd/ytsync_test.go b/cmd/ytsync_test.go deleted file mode 100644 index c539185..0000000 --- a/cmd/ytsync_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package cmd - -import ( - "testing" -) - -func TestFetchChannels(t *testing.T) { - fetchChannels() -}