remove common.go
merge sync and selfsync export sync manager out of cmd package clean up ytsync.go address #19 and #20
This commit is contained in:
parent
4a4008d6a0
commit
dfa3bb1b19
5 changed files with 330 additions and 421 deletions
|
@ -1,27 +0,0 @@
|
|||
package cmd
|
||||
|
||||
const defaultMaxTries = 3
|
||||
|
||||
var (
|
||||
stopOnError bool
|
||||
maxTries int
|
||||
takeOverExistingChannel bool
|
||||
refill int
|
||||
limit int
|
||||
skipSpaceCheck bool
|
||||
syncUpdate bool
|
||||
syncStatus string
|
||||
syncFrom int64
|
||||
syncUntil int64
|
||||
concurrentJobs int
|
||||
)
|
||||
|
||||
const (
|
||||
StatusPending = "pending" // waiting for permission to sync
|
||||
StatusQueued = "queued" // in sync queue. will be synced soon
|
||||
StatusSyncing = "syncing" // syncing now
|
||||
StatusSynced = "synced" // done
|
||||
StatusFailed = "failed"
|
||||
)
|
||||
|
||||
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed}
|
315
cmd/selfsync.go
315
cmd/selfsync.go
|
@ -1,29 +1,32 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
url2 "net/url"
|
||||
"os"
|
||||
|
||||
"os/user"
|
||||
|
||||
"time"
|
||||
|
||||
"strconv"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/null"
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
sync "github.com/lbryio/lbry.go/ytsync"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var APIURL string
|
||||
var APIToken string
|
||||
const defaultMaxTries = 3
|
||||
|
||||
var (
|
||||
stopOnError bool
|
||||
maxTries int
|
||||
takeOverExistingChannel bool
|
||||
refill int
|
||||
limit int
|
||||
skipSpaceCheck bool
|
||||
syncUpdate bool
|
||||
syncStatus string
|
||||
channelID string
|
||||
syncFrom int64
|
||||
syncUntil int64
|
||||
concurrentJobs int
|
||||
)
|
||||
|
||||
func init() {
|
||||
var selfSyncCmd = &cobra.Command{
|
||||
|
@ -38,138 +41,32 @@ func init() {
|
|||
selfSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
|
||||
selfSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
|
||||
selfSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)")
|
||||
selfSyncCmd.Flags().StringVar(&syncStatus, "status", StatusQueued, "Specify which queue to pull from. Overrides --update (Default: queued)")
|
||||
selfSyncCmd.Flags().StringVar(&syncStatus, "status", sync.StatusQueued, "Specify which queue to pull from. Overrides --update (Default: queued)")
|
||||
selfSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
|
||||
selfSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
|
||||
selfSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
|
||||
selfSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently (Default: 1)")
|
||||
|
||||
RootCmd.AddCommand(selfSyncCmd)
|
||||
APIURL = os.Getenv("LBRY_API")
|
||||
APIToken = os.Getenv("LBRY_API_TOKEN")
|
||||
if APIURL == "" {
|
||||
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API")
|
||||
return
|
||||
}
|
||||
if APIToken == "" {
|
||||
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type APIJobsResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data []APIYoutubeChannel `json:"data"`
|
||||
}
|
||||
|
||||
type APIYoutubeChannel struct {
|
||||
ChannelId string `json:"channel_id"`
|
||||
TotalVideos uint `json:"total_videos"`
|
||||
DesiredChannelName string `json:"desired_channel_name"`
|
||||
SyncServer null.String `json:"sync_server"`
|
||||
}
|
||||
|
||||
func fetchChannels(status string) ([]APIYoutubeChannel, error) {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, errors.Err("could not detect system hostname")
|
||||
}
|
||||
url := APIURL + "/yt/jobs"
|
||||
res, _ := http.PostForm(url, url2.Values{
|
||||
"auth_token": {APIToken},
|
||||
"sync_status": {status},
|
||||
"min_videos": {strconv.Itoa(1)},
|
||||
"after": {strconv.Itoa(int(syncFrom))},
|
||||
"before": {strconv.Itoa(int(syncUntil))},
|
||||
"sync_server": {host},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response APIJobsResponse
|
||||
err = json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if response.Data == nil {
|
||||
return nil, errors.Err(response.Error)
|
||||
}
|
||||
log.Printf("Fetched channels: %d", len(response.Data))
|
||||
return response.Data, nil
|
||||
}
|
||||
|
||||
type APISyncUpdateResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data null.String `json:"data"`
|
||||
}
|
||||
|
||||
func setChannelSyncStatus(channelID string, status string) error {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
return errors.Err("could not detect system hostname")
|
||||
}
|
||||
url := APIURL + "/yt/sync_update"
|
||||
|
||||
res, _ := http.PostForm(url, url2.Values{
|
||||
"channel_id": {channelID},
|
||||
"sync_server": {host},
|
||||
"auth_token": {APIToken},
|
||||
"sync_status": {status},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response APISyncUpdateResponse
|
||||
err = json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return errors.Err(response.Error.String)
|
||||
}
|
||||
if !response.Data.IsNull() && response.Data.String == "ok" {
|
||||
return nil
|
||||
}
|
||||
return errors.Err("invalid API response")
|
||||
}
|
||||
|
||||
func spaceCheck() error {
|
||||
usr, err := user.Current()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
usedPctile, err := util.GetUsedSpace(usr.HomeDir + "/.lbrynet/blobfiles/")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if usedPctile >= 0.90 && !skipSpaceCheck {
|
||||
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
|
||||
}
|
||||
util.SendInfoToSlack("disk usage: %.1f%%", usedPctile*100)
|
||||
return nil
|
||||
}
|
||||
|
||||
func selfSync(cmd *cobra.Command, args []string) {
|
||||
var hostname string
|
||||
slackToken := os.Getenv("SLACK_TOKEN")
|
||||
if slackToken == "" {
|
||||
log.Error("A slack token was not present in env vars! Slack messages disabled!")
|
||||
} else {
|
||||
host, err := os.Hostname()
|
||||
var err error
|
||||
hostname, err = os.Hostname()
|
||||
if err != nil {
|
||||
log.Error("could not detect system hostname")
|
||||
host = "ytsync-unknown"
|
||||
hostname = "ytsync-unknown"
|
||||
}
|
||||
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), host)
|
||||
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
|
||||
}
|
||||
err := spaceCheck()
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(err.Error())
|
||||
return
|
||||
}
|
||||
ytAPIKey := os.Getenv("YOUTUBE_API_KEY")
|
||||
//authToken := args[1]
|
||||
|
||||
if !util.InSlice(syncStatus, SyncStatuses) {
|
||||
log.Errorf("status must be one of the following: %v\n", SyncStatuses)
|
||||
if !util.InSlice(syncStatus, sync.SyncStatuses) {
|
||||
log.Errorf("status must be one of the following: %v\n", sync.SyncStatuses)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -186,148 +83,26 @@ func selfSync(cmd *cobra.Command, args []string) {
|
|||
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
|
||||
return
|
||||
}
|
||||
syncCount := 0
|
||||
if syncStatus == StatusQueued {
|
||||
mainLoop:
|
||||
for {
|
||||
//before processing the queued ones first clear the pending ones (if any)
|
||||
//TODO: extract method
|
||||
queuesToSync := []string{
|
||||
StatusSyncing,
|
||||
}
|
||||
if syncUpdate {
|
||||
queuesToSync = append(queuesToSync, StatusSynced)
|
||||
} else {
|
||||
queuesToSync = append(queuesToSync, StatusQueued)
|
||||
}
|
||||
for _, v := range queuesToSync {
|
||||
interruptedByUser, err := processQueue(v, ytAPIKey, &syncCount)
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(err.Error())
|
||||
break mainLoop
|
||||
}
|
||||
if interruptedByUser {
|
||||
util.SendInfoToSlack("interrupted by user!")
|
||||
break mainLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// sync whatever was specified
|
||||
_, err := processQueue(syncStatus, ytAPIKey, &syncCount)
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(err.Error())
|
||||
return
|
||||
}
|
||||
sm := sync.SyncManager{
|
||||
StopOnError: stopOnError,
|
||||
MaxTries: maxTries,
|
||||
TakeOverExistingChannel: takeOverExistingChannel,
|
||||
Refill: refill,
|
||||
Limit: limit,
|
||||
SkipSpaceCheck: skipSpaceCheck,
|
||||
SyncUpdate: syncUpdate,
|
||||
SyncStatus: syncStatus,
|
||||
SyncFrom: syncFrom,
|
||||
SyncUntil: syncUntil,
|
||||
ConcurrentJobs: concurrentJobs,
|
||||
ConcurrentVideos: concurrentJobs,
|
||||
HostName: hostname,
|
||||
YoutubeChannelID: channelID,
|
||||
}
|
||||
|
||||
err := sm.Start()
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(err.Error())
|
||||
}
|
||||
util.SendInfoToSlack("Syncing process terminated!")
|
||||
}
|
||||
|
||||
func processQueue(queueStatus string, ytAPIKey string, syncCount *int) (bool, error) {
|
||||
util.SendInfoToSlack("Syncing %s channels", queueStatus)
|
||||
channelsToSync, err := fetchChannels(queueStatus)
|
||||
if err != nil {
|
||||
return false, errors.Prefix("failed to fetch channels", err)
|
||||
}
|
||||
filteredChannelsToSync := make([]APIYoutubeChannel, len(channelsToSync))
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
return false, errors.Err("could not detect system hostname")
|
||||
}
|
||||
index := 0
|
||||
for _, v := range channelsToSync {
|
||||
if !v.SyncServer.IsNull() && v.SyncServer.String != host {
|
||||
filteredChannelsToSync[index] = v
|
||||
index++
|
||||
}
|
||||
}
|
||||
interrupted, err := syncChannels(channelsToSync, ytAPIKey, syncCount)
|
||||
util.SendInfoToSlack("Finished syncing %s channels", queueStatus)
|
||||
return interrupted, err
|
||||
}
|
||||
|
||||
// syncChannels processes a slice of youtube channels (channelsToSync) and returns a bool that indicates whether
|
||||
// the execution finished by itself or was interrupted by the user and an error if anything happened
|
||||
func syncChannels(channelsToSync []APIYoutubeChannel, ytAPIKey string, syncCount *int) (bool, error) {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
host = ""
|
||||
}
|
||||
for loop := 0; loop < len(channelsToSync) && (limit == 0 || *syncCount < limit); loop++ {
|
||||
//log.Printf("inside loop: %d", loop)
|
||||
err = spaceCheck()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
//avoid dereferencing
|
||||
channel := channelsToSync[loop]
|
||||
channelID := channel.ChannelId
|
||||
lbryChannelName := channel.DesiredChannelName
|
||||
if channel.TotalVideos < 1 {
|
||||
util.SendInfoToSlack("Channel %s has no videos. Skipping", lbryChannelName)
|
||||
continue
|
||||
}
|
||||
if !channel.SyncServer.IsNull() && channel.SyncServer.String != host {
|
||||
util.SendInfoToSlack("Channel %s is being synced by another server: %s", lbryChannelName, channel.SyncServer.String)
|
||||
continue
|
||||
}
|
||||
|
||||
//acquire the lock on the channel
|
||||
err := setChannelSyncStatus(channelID, StatusSyncing)
|
||||
if err != nil {
|
||||
//util.SendErrorToSlack("Failed acquiring sync rights for channel %s: %v", lbryChannelName, err)
|
||||
continue
|
||||
}
|
||||
//increment only if successfully acquired lock
|
||||
*syncCount++
|
||||
util.SendInfoToSlack("Syncing %s to LBRY! (iteration %d)", lbryChannelName, *syncCount)
|
||||
|
||||
s := sync.Sync{
|
||||
YoutubeAPIKey: ytAPIKey,
|
||||
YoutubeChannelID: channelID,
|
||||
LbryChannelName: lbryChannelName,
|
||||
StopOnError: stopOnError,
|
||||
MaxTries: maxTries,
|
||||
ConcurrentVideos: concurrentJobs,
|
||||
TakeOverExistingChannel: takeOverExistingChannel,
|
||||
Refill: refill,
|
||||
}
|
||||
|
||||
err = s.FullCycle()
|
||||
util.SendInfoToSlack("Syncing " + lbryChannelName + " reached an end.")
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(errors.FullTrace(err))
|
||||
fatalErrors := []string{
|
||||
"default_wallet already exists",
|
||||
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
|
||||
"NotEnoughFunds",
|
||||
"no space left on device",
|
||||
}
|
||||
//mark video as failed
|
||||
err2 := setChannelSyncStatus(channelID, StatusFailed)
|
||||
if err2 != nil {
|
||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", lbryChannelName)
|
||||
err2 = errors.Prefix(msg, err2)
|
||||
util.SendErrorToSlack(err2.Error())
|
||||
}
|
||||
if util.InSliceContains(err.Error(), fatalErrors) {
|
||||
return true, errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
if s.IsInterrupted() {
|
||||
return true, nil
|
||||
}
|
||||
//mark video as synced
|
||||
err = setChannelSyncStatus(channelID, StatusSynced)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed setting failed state for channel %s. \n@Nikooo777 this requires manual intervention! Exiting...", lbryChannelName)
|
||||
return false, errors.Prefix(msg, err)
|
||||
}
|
||||
}
|
||||
if limit != 0 && *syncCount >= limit {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/user"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
sync "github.com/lbryio/lbry.go/ytsync"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func init() {
|
||||
var ytSyncCmd = &cobra.Command{
|
||||
Use: "ytsync <youtube_api_key> <lbry_channel_name> [<youtube_channel_id>]",
|
||||
Args: cobra.RangeArgs(2, 3),
|
||||
Short: "Publish youtube channel into LBRY network.",
|
||||
Run: ytsync,
|
||||
}
|
||||
ytSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
|
||||
ytSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
|
||||
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
|
||||
ytSyncCmd.Flags().IntVar(&refill, "refill", 0, "Also add this many credits to the wallet")
|
||||
ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
|
||||
RootCmd.AddCommand(ytSyncCmd)
|
||||
}
|
||||
|
||||
func ytsync(cmd *cobra.Command, args []string) {
|
||||
slackToken := os.Getenv("SLACK_TOKEN")
|
||||
if slackToken == "" {
|
||||
log.Error("A slack token was not present in env vars! Slack messages disabled!")
|
||||
} else {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
log.Error("could not detect system hostname")
|
||||
host = "ytsync-unknown"
|
||||
}
|
||||
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), host)
|
||||
}
|
||||
usr, err := user.Current()
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(err.Error())
|
||||
return
|
||||
}
|
||||
usedPctile, err := util.GetUsedSpace(usr.HomeDir + "/.lbrynet/blobfiles/")
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(err.Error())
|
||||
return
|
||||
}
|
||||
if usedPctile > 0.9 && !skipSpaceCheck {
|
||||
util.SendErrorToSlack("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)
|
||||
return
|
||||
}
|
||||
util.SendInfoToSlack("disk usage: %.1f%%", usedPctile*100)
|
||||
|
||||
ytAPIKey := args[0]
|
||||
lbryChannelName := args[1]
|
||||
if string(lbryChannelName[0]) != "@" {
|
||||
log.Errorln("LBRY channel name must start with an @")
|
||||
return
|
||||
}
|
||||
|
||||
channelID := ""
|
||||
if len(args) > 2 {
|
||||
channelID = args[2]
|
||||
}
|
||||
|
||||
if stopOnError && maxTries != defaultMaxTries {
|
||||
log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
|
||||
return
|
||||
}
|
||||
if maxTries < 1 {
|
||||
log.Errorln("setting --max-tries less than 1 doesn't make sense")
|
||||
return
|
||||
}
|
||||
util.SendInfoToSlack("Syncing " + lbryChannelName + " to LBRY!")
|
||||
|
||||
s := sync.Sync{
|
||||
YoutubeAPIKey: ytAPIKey,
|
||||
YoutubeChannelID: channelID,
|
||||
LbryChannelName: lbryChannelName,
|
||||
StopOnError: stopOnError,
|
||||
MaxTries: maxTries,
|
||||
ConcurrentVideos: 1,
|
||||
TakeOverExistingChannel: takeOverExistingChannel,
|
||||
Refill: refill,
|
||||
}
|
||||
|
||||
err = s.FullCycle()
|
||||
|
||||
if err != nil {
|
||||
util.SendErrorToSlack(errors.FullTrace(err))
|
||||
}
|
||||
util.SendInfoToSlack("Syncing " + lbryChannelName + " reached an end.")
|
||||
}
|
255
ytsync/manager.go
Normal file
255
ytsync/manager.go
Normal file
|
@ -0,0 +1,255 @@
|
|||
package ytsync
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/user"
|
||||
"strconv"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/null"
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type SyncManager struct {
|
||||
StopOnError bool
|
||||
MaxTries int
|
||||
TakeOverExistingChannel bool
|
||||
Refill int
|
||||
Limit int
|
||||
SkipSpaceCheck bool
|
||||
SyncUpdate bool
|
||||
SyncStatus string
|
||||
SyncFrom int64
|
||||
SyncUntil int64
|
||||
ConcurrentJobs int
|
||||
ConcurrentVideos int
|
||||
HostName string
|
||||
YoutubeChannelID string
|
||||
|
||||
youtubeAPIKey string
|
||||
apiURL string
|
||||
apiToken string
|
||||
}
|
||||
|
||||
const (
|
||||
StatusPending = "pending" // waiting for permission to sync
|
||||
StatusQueued = "queued" // in sync queue. will be synced soon
|
||||
StatusSyncing = "syncing" // syncing now
|
||||
StatusSynced = "synced" // done
|
||||
StatusFailed = "failed"
|
||||
)
|
||||
|
||||
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed}
|
||||
|
||||
type apiJobsResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data []apiYoutubeChannel `json:"data"`
|
||||
}
|
||||
|
||||
type apiYoutubeChannel struct {
|
||||
ChannelId string `json:"channel_id"`
|
||||
TotalVideos uint `json:"total_videos"`
|
||||
DesiredChannelName string `json:"desired_channel_name"`
|
||||
SyncServer null.String `json:"sync_server"`
|
||||
}
|
||||
|
||||
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
||||
endpoint := s.apiURL + "/yt/jobs"
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"auth_token": {s.apiToken},
|
||||
"sync_status": {status},
|
||||
"min_videos": {strconv.Itoa(1)},
|
||||
"after": {strconv.Itoa(int(s.SyncFrom))},
|
||||
"before": {strconv.Itoa(int(s.SyncUntil))},
|
||||
//"sync_server": {s.HostName},
|
||||
"channel_id": {s.YoutubeChannelID},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiJobsResponse
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if response.Data == nil {
|
||||
return nil, errors.Err(response.Error)
|
||||
}
|
||||
log.Printf("Fetched channels: %d", len(response.Data))
|
||||
return response.Data, nil
|
||||
}
|
||||
|
||||
type apiSyncUpdateResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data null.String `json:"data"`
|
||||
}
|
||||
|
||||
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error {
|
||||
endpoint := s.apiURL + "/yt/sync_update"
|
||||
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"channel_id": {channelID},
|
||||
"sync_server": {s.HostName},
|
||||
"auth_token": {s.apiToken},
|
||||
"sync_status": {status},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiSyncUpdateResponse
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return errors.Err(response.Error.String)
|
||||
}
|
||||
if !response.Data.IsNull() && response.Data.String == "ok" {
|
||||
return nil
|
||||
}
|
||||
return errors.Err("invalid API response")
|
||||
}
|
||||
|
||||
func (s SyncManager) Start() error {
|
||||
s.apiURL = os.Getenv("LBRY_API")
|
||||
s.apiToken = os.Getenv("LBRY_API_TOKEN")
|
||||
s.youtubeAPIKey = os.Getenv("YOUTUBE_API_KEY")
|
||||
if s.apiURL == "" {
|
||||
return errors.Err("An API URL was not defined. Please set the environment variable LBRY_API")
|
||||
}
|
||||
if s.apiToken == "" {
|
||||
return errors.Err("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
|
||||
}
|
||||
if s.youtubeAPIKey == "" {
|
||||
return errors.Err("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
|
||||
}
|
||||
|
||||
syncCount := 0
|
||||
for {
|
||||
err := s.checkUsedSpace()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var syncs []Sync
|
||||
shouldInterruptLoop := false
|
||||
|
||||
isSingleChannelSync := s.YoutubeChannelID != ""
|
||||
if isSingleChannelSync {
|
||||
channels, err := s.fetchChannels("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(channels) != 1 {
|
||||
return errors.Err("Expected 1 channel, %d returned", len(channels))
|
||||
}
|
||||
lbryChannelName := channels[0].DesiredChannelName
|
||||
if !s.isWorthProcessing(channels[0]) {
|
||||
break
|
||||
}
|
||||
syncs = make([]Sync, 1)
|
||||
syncs[0] = Sync{
|
||||
YoutubeAPIKey: s.youtubeAPIKey,
|
||||
YoutubeChannelID: s.YoutubeChannelID,
|
||||
LbryChannelName: lbryChannelName,
|
||||
StopOnError: s.StopOnError,
|
||||
MaxTries: s.MaxTries,
|
||||
ConcurrentVideos: s.ConcurrentVideos,
|
||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||
Refill: s.Refill,
|
||||
Manager: &s,
|
||||
}
|
||||
shouldInterruptLoop = true
|
||||
} else {
|
||||
var queuesToSync []string
|
||||
if s.SyncStatus != "" {
|
||||
queuesToSync = append(queuesToSync, s.SyncStatus)
|
||||
} else if s.SyncUpdate {
|
||||
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
|
||||
} else {
|
||||
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
|
||||
}
|
||||
for _, q := range queuesToSync {
|
||||
channels, err := s.fetchChannels(q)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, c := range channels {
|
||||
if !s.isWorthProcessing(c) {
|
||||
continue
|
||||
}
|
||||
syncs = append(syncs, Sync{
|
||||
YoutubeAPIKey: s.youtubeAPIKey,
|
||||
YoutubeChannelID: c.ChannelId,
|
||||
LbryChannelName: c.DesiredChannelName,
|
||||
StopOnError: s.StopOnError,
|
||||
MaxTries: s.MaxTries,
|
||||
ConcurrentVideos: s.ConcurrentVideos,
|
||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||
Refill: s.Refill,
|
||||
Manager: &s,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(syncs) == 0 {
|
||||
log.Infoln("No channels to sync. Pausing 5 minutes!")
|
||||
time.Sleep(5 * time.Minute)
|
||||
}
|
||||
for i, sync := range syncs {
|
||||
util.SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount)
|
||||
err := sync.FullCycle()
|
||||
if err != nil {
|
||||
fatalErrors := []string{
|
||||
"default_wallet already exists",
|
||||
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
|
||||
"NotEnoughFunds",
|
||||
"no space left on device",
|
||||
}
|
||||
if util.InSliceContains(err.Error(), fatalErrors) {
|
||||
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
|
||||
}
|
||||
util.SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
|
||||
}
|
||||
util.SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount)
|
||||
syncCount++
|
||||
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
|
||||
shouldInterruptLoop = true
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
if shouldInterruptLoop {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
|
||||
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
|
||||
}
|
||||
|
||||
func (s SyncManager) checkUsedSpace() error {
|
||||
usr, err := user.Current()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
usedPctile, err := util.GetUsedSpace(usr.HomeDir + "/.lbrynet/blobfiles/")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if usedPctile >= 0.90 && !s.SkipSpaceCheck {
|
||||
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
|
||||
}
|
||||
util.SendInfoToSlack("disk usage: %.1f%%", usedPctile*100)
|
||||
return nil
|
||||
}
|
|
@ -59,6 +59,7 @@ type Sync struct {
|
|||
ConcurrentVideos int
|
||||
TakeOverExistingChannel bool
|
||||
Refill int
|
||||
Manager *SyncManager
|
||||
|
||||
daemon *jsonrpc.Client
|
||||
claimAddress string
|
||||
|
@ -82,19 +83,40 @@ func (s *Sync) IsInterrupted() bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Sync) FullCycle() error {
|
||||
var err error
|
||||
func (s *Sync) FullCycle() (e error) {
|
||||
if os.Getenv("HOME") == "" {
|
||||
return errors.Err("no $HOME env var found")
|
||||
}
|
||||
|
||||
if s.YoutubeChannelID == "" {
|
||||
channelID, err := getChannelIDFromFile(s.LbryChannelName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.YoutubeChannelID = channelID
|
||||
return errors.Err("channel ID not provided")
|
||||
}
|
||||
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if e != nil {
|
||||
//conditions for which a channel shouldn't be marked as failed
|
||||
noFailConditions := []string{
|
||||
"this youtube channel is being managed by another server",
|
||||
}
|
||||
if util.InSliceContains(e.Error(), noFailConditions) {
|
||||
return
|
||||
}
|
||||
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
||||
err = errors.Prefix(msg, err)
|
||||
e = errors.Prefix(err.Error(), e)
|
||||
}
|
||||
} else {
|
||||
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
|
||||
if err != nil {
|
||||
e = err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
|
||||
if os.Getenv("REGTEST") == "true" {
|
||||
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
|
||||
|
@ -493,26 +515,6 @@ func stopDaemonViaSystemd() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func getChannelIDFromFile(channelName string) (string, error) {
|
||||
channelsJSON, err := ioutil.ReadFile("./channels")
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
var channels map[string]string
|
||||
err = json.Unmarshal(channelsJSON, &channels)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
channelID, ok := channels[channelName]
|
||||
if !ok {
|
||||
return "", errors.Err("channel not in list")
|
||||
}
|
||||
|
||||
return channelID, nil
|
||||
}
|
||||
|
||||
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
|
||||
func waitForDaemonProcess(timeout time.Duration) error {
|
||||
processes, err := ps.Processes()
|
||||
|
|
Loading…
Add table
Reference in a new issue