add clean interruption
change how post requests are made add better support for channel updates
This commit is contained in:
parent
c8eebbfae1
commit
744021cfbc
4 changed files with 48 additions and 43 deletions
|
@ -9,6 +9,8 @@ var (
|
||||||
refill int
|
refill int
|
||||||
limit int
|
limit int
|
||||||
skipSpaceCheck bool
|
skipSpaceCheck bool
|
||||||
|
syncUpdate bool
|
||||||
|
syncStatus string
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
@ -6,10 +6,10 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"os/user"
|
"os/user"
|
||||||
|
|
||||||
|
url2 "net/url"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/errors"
|
"github.com/lbryio/lbry.go/errors"
|
||||||
"github.com/lbryio/lbry.go/null"
|
"github.com/lbryio/lbry.go/null"
|
||||||
"github.com/lbryio/lbry.go/util"
|
"github.com/lbryio/lbry.go/util"
|
||||||
|
@ -20,8 +20,8 @@ import (
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
var selfSyncCmd = &cobra.Command{
|
var selfSyncCmd = &cobra.Command{
|
||||||
Use: "selfsync <youtube_api_key> <auth_token> [<sync_status>]",
|
Use: "selfsync <youtube_api_key> <auth_token>",
|
||||||
Args: cobra.RangeArgs(2, 3),
|
Args: cobra.RangeArgs(2, 2),
|
||||||
Short: "Publish youtube channels into LBRY network automatically.",
|
Short: "Publish youtube channels into LBRY network automatically.",
|
||||||
Run: selfSync,
|
Run: selfSync,
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,8 @@ func init() {
|
||||||
selfSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
|
selfSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
|
||||||
selfSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
|
selfSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
|
||||||
selfSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
|
selfSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
|
||||||
selfSyncCmd.Flags().BoolVar(&skipSpaceCheck, "update", false, "Update previously synced channels instead of syncing new ones")
|
selfSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)")
|
||||||
|
selfSyncCmd.Flags().StringVar(&syncStatus, "status", StatusQueued, "Specify which queue to pull from. (Default: queued)")
|
||||||
|
|
||||||
RootCmd.AddCommand(selfSyncCmd)
|
RootCmd.AddCommand(selfSyncCmd)
|
||||||
}
|
}
|
||||||
|
@ -48,19 +49,14 @@ type APIYoutubeChannel struct {
|
||||||
SyncServer null.String `json:"sync_server"`
|
SyncServer null.String `json:"sync_server"`
|
||||||
}
|
}
|
||||||
|
|
||||||
//PoC
|
|
||||||
func fetchChannels(authToken string, status string) ([]APIYoutubeChannel, error) {
|
func fetchChannels(authToken string, status string) ([]APIYoutubeChannel, error) {
|
||||||
url := "https://api.lbry.io/yt/jobs"
|
url := "http://localhost:8080/yt/jobs"
|
||||||
payload := strings.NewReader("------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" +
|
res, _ := http.PostForm(url, url2.Values{
|
||||||
"Content-Disposition: form-data; name=\"auth_token\"\r\n\r\n" + authToken + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" +
|
"auth_token": {authToken},
|
||||||
"Content-Disposition: form-data; name=\"sync_status\"\r\n\r\n" + status + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW--")
|
"sync_status": {status},
|
||||||
req, _ := http.NewRequest("POST", url, payload)
|
})
|
||||||
req.Header.Add("content-type", "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW")
|
|
||||||
res, _ := http.DefaultClient.Do(req)
|
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
//fmt.Println(res)
|
|
||||||
//fmt.Println(string(body))
|
|
||||||
var response APIJobsResponse
|
var response APIJobsResponse
|
||||||
err := json.Unmarshal(body, &response)
|
err := json.Unmarshal(body, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -80,20 +76,16 @@ func setChannelSyncStatus(authToken string, channelID string, status string) err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err("could not detect system hostname")
|
return errors.Err("could not detect system hostname")
|
||||||
}
|
}
|
||||||
url := "https://api.lbry.io/yt/sync_update"
|
url := "http://localhost:8080/yt/sync_update"
|
||||||
payload := strings.NewReader("------WebKitFormBoundary7MA4YWxkTrZu0gW\r\nContent-Disposition: form-data;" +
|
|
||||||
" name=\"channel_id\"\r\n\r\n" + channelID + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" +
|
res, _ := http.PostForm(url, url2.Values{
|
||||||
"Content-Disposition: form-data; name=\"sync_server\"\r\n\r\n" + host + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" +
|
"channel_id": {channelID},
|
||||||
"Content-Disposition: form-data; name=\"auth_token\"\r\n\r\n" + authToken + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW\r\n" +
|
"sync_server": {host},
|
||||||
"Content-Disposition: form-data; name=\"sync_status\"\r\n\r\n" + status + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW--")
|
"auth_token": {authToken},
|
||||||
req, _ := http.NewRequest("POST", url, payload)
|
"sync_status": {status},
|
||||||
req.Header.Add("content-type", "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW")
|
})
|
||||||
req.Header.Add("Cache-Control", "no-cache")
|
|
||||||
res, _ := http.DefaultClient.Do(req)
|
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
//fmt.Println(res)
|
|
||||||
//fmt.Println(string(body))
|
|
||||||
var response APISyncUpdateResponse
|
var response APISyncUpdateResponse
|
||||||
err = json.Unmarshal(body, &response)
|
err = json.Unmarshal(body, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -125,7 +117,7 @@ func selfSync(cmd *cobra.Command, args []string) {
|
||||||
util.SendToSlackError(err.Error())
|
util.SendToSlackError(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if usedPctile > 0.9 && !skipSpaceCheck {
|
if usedPctile > 0.90 && !skipSpaceCheck {
|
||||||
util.SendToSlackError("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)
|
util.SendToSlackError("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -134,14 +126,12 @@ func selfSync(cmd *cobra.Command, args []string) {
|
||||||
ytAPIKey := args[0]
|
ytAPIKey := args[0]
|
||||||
authToken := args[1]
|
authToken := args[1]
|
||||||
|
|
||||||
status := StatusQueued
|
if !util.InSlice(syncStatus, SyncStatuses) {
|
||||||
if len(args) > 2 {
|
log.Errorf("status must be one of the following: %v\n", SyncStatuses)
|
||||||
if util.InSlice(args[2], SyncStatuses) {
|
return
|
||||||
status = args[2]
|
}
|
||||||
} else {
|
if syncUpdate {
|
||||||
log.Errorf("status must be one of the following: %v\n", SyncStatuses)
|
syncStatus = StatusSynced
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if stopOnError && maxTries != defaultMaxTries {
|
if stopOnError && maxTries != defaultMaxTries {
|
||||||
log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
|
log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
|
||||||
|
@ -156,7 +146,7 @@ func selfSync(cmd *cobra.Command, args []string) {
|
||||||
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
|
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
channelsToSync, err := fetchChannels(authToken, status)
|
channelsToSync, err := fetchChannels(authToken, syncStatus)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.SendToSlackError("failed to fetch channels: %v", err)
|
util.SendToSlackError("failed to fetch channels: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -165,24 +155,25 @@ func selfSync(cmd *cobra.Command, args []string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
host = ""
|
host = ""
|
||||||
}
|
}
|
||||||
for loops := 0; loops < len(channelsToSync) && (limit != 0 && loops < limit); loops++ {
|
|
||||||
|
for loops := 0; loops < len(channelsToSync) && (limit == 0 || loops < limit); loops++ {
|
||||||
//avoid dereferencing
|
//avoid dereferencing
|
||||||
channel := channelsToSync[loops]
|
channel := channelsToSync[loops]
|
||||||
channelID := channel.ChannelId
|
channelID := channel.ChannelId
|
||||||
lbryChannelName := channel.DesiredChannelName
|
lbryChannelName := channel.DesiredChannelName
|
||||||
if channel.TotalVideos < 1 {
|
if channel.TotalVideos < 1 {
|
||||||
util.SendToSlackInfo("Channnel %s has no videos. Skipping", lbryChannelName)
|
util.SendToSlackInfo("Channel %s has no videos. Skipping", lbryChannelName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !channel.SyncServer.IsNull() && channel.SyncServer.String != host {
|
if !channel.SyncServer.IsNull() && channel.SyncServer.String != host {
|
||||||
util.SendToSlackInfo("Channnel %s is being synced by another server: %s", lbryChannelName, channel.SyncServer.String)
|
util.SendToSlackInfo("Channel %s is being synced by another server: %s", lbryChannelName, channel.SyncServer.String)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
//acquire the lock on the channel
|
//acquire the lock on the channel
|
||||||
err := setChannelSyncStatus(authToken, channelID, StatusSyncing)
|
err := setChannelSyncStatus(authToken, channelID, StatusSyncing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.SendToSlackError("Failed aquiring sync rights for channel %s: %v", lbryChannelName, err)
|
util.SendToSlackError("Failed acquiring sync rights for channel %s: %v", lbryChannelName, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
util.SendToSlackInfo("Syncing %s to LBRY! (iteration %d)", lbryChannelName, loops)
|
util.SendToSlackInfo("Syncing %s to LBRY! (iteration %d)", lbryChannelName, loops)
|
||||||
|
@ -220,13 +211,15 @@ func selfSync(cmd *cobra.Command, args []string) {
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if s.IsInterrupted() {
|
||||||
|
break
|
||||||
|
}
|
||||||
//mark video as synced
|
//mark video as synced
|
||||||
err = setChannelSyncStatus(authToken, channelID, StatusSynced)
|
err = setChannelSyncStatus(authToken, channelID, StatusSynced)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("Failed setting synced state for channel %s: %v", lbryChannelName, err)
|
msg := fmt.Sprintf("Failed setting synced state for channel %s: %v", lbryChannelName, err)
|
||||||
util.SendToSlackError(msg)
|
util.SendToSlackError(msg)
|
||||||
util.SendToSlackError("@Nikooo777 this requires manual intervention! Exiting...")
|
util.SendToSlackError("@Nikooo777 this requires manual intervention! Exiting...")
|
||||||
//this error is very bad. it requires manual intervention
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFetchChannels(t *testing.T) {
|
func TestFetchChannels(t *testing.T) {
|
||||||
res, err := fetchChannels("620280", StatusFailed)
|
res, err := fetchChannels("620280", StatusSynced)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,16 @@ type Sync struct {
|
||||||
queue chan video
|
queue chan video
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsInterrupted can be queried to discover if the sync process was interrupted manually
|
||||||
|
func (s *Sync) IsInterrupted() bool {
|
||||||
|
select {
|
||||||
|
case <-s.stop.Chan():
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Sync) FullCycle() error {
|
func (s *Sync) FullCycle() error {
|
||||||
var err error
|
var err error
|
||||||
if os.Getenv("HOME") == "" {
|
if os.Getenv("HOME") == "" {
|
||||||
|
|
Loading…
Add table
Reference in a new issue