improve continuos self sync

fix bug in error handling while deleting videos
refactoring of code
This commit is contained in:
Niko Storni 2018-05-24 22:03:54 -04:00
parent 744021cfbc
commit 5a1ba06f4c
No known key found for this signature in database
GPG key ID: F37FE63398800368
2 changed files with 112 additions and 41 deletions

View file

@ -5,10 +5,10 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os"
"os/user"
url2 "net/url" url2 "net/url"
"os"
"os/user"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null" "github.com/lbryio/lbry.go/null"
@ -31,7 +31,7 @@ func init() {
selfSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") selfSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
selfSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") selfSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
selfSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)") selfSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)")
selfSyncCmd.Flags().StringVar(&syncStatus, "status", StatusQueued, "Specify which queue to pull from. (Default: queued)") selfSyncCmd.Flags().StringVar(&syncStatus, "status", StatusQueued, "Specify which queue to pull from. Overrides --update (Default: queued)")
RootCmd.AddCommand(selfSyncCmd) RootCmd.AddCommand(selfSyncCmd)
} }
@ -100,6 +100,22 @@ func setChannelSyncStatus(authToken string, channelID string, status string) err
return errors.Err("invalid API response") return errors.Err("invalid API response")
} }
func spaceCheck() error {
usr, err := user.Current()
if err != nil {
return err
}
usedPctile, err := util.GetUsedSpace(usr.HomeDir + "/.lbrynet/blobfiles/")
if err != nil {
return err
}
if usedPctile > 0.90 && !skipSpaceCheck {
return errors.Err("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)
}
util.SendToSlackInfo("disk usage: %.1f%%", usedPctile*100)
return nil
}
func selfSync(cmd *cobra.Command, args []string) { func selfSync(cmd *cobra.Command, args []string) {
slackToken := os.Getenv("SLACK_TOKEN") slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" { if slackToken == "" {
@ -107,21 +123,6 @@ func selfSync(cmd *cobra.Command, args []string) {
} else { } else {
util.InitSlack(os.Getenv("SLACK_TOKEN")) util.InitSlack(os.Getenv("SLACK_TOKEN"))
} }
usr, err := user.Current()
if err != nil {
util.SendToSlackError(err.Error())
return
}
usedPctile, err := util.GetUsedSpace(usr.HomeDir + "/.lbrynet/blobfiles/")
if err != nil {
util.SendToSlackError(err.Error())
return
}
if usedPctile > 0.90 && !skipSpaceCheck {
util.SendToSlackError("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)
return
}
util.SendToSlackInfo("disk usage: %.1f%%", usedPctile*100)
ytAPIKey := args[0] ytAPIKey := args[0]
authToken := args[1] authToken := args[1]
@ -146,19 +147,93 @@ func selfSync(cmd *cobra.Command, args []string) {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense") log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return return
} }
syncCount := 0
if syncStatus == StatusQueued {
for {
//before processing the queued ones first clear the pending ones (if any)
//TODO: extract method
syncingChannels, err := fetchChannels(authToken, StatusSyncing)
if err != nil {
util.SendToSlackError("failed to fetch channels: %v", err)
break
}
interruptedByUser, err := syncChannels(syncingChannels, authToken, ytAPIKey, &syncCount)
if err != nil {
util.SendToSlackError("%v", err)
break
}
if interruptedByUser {
break
}
util.SendToSlackInfo("Finished syncing pending channels")
//process queued channels
queuedChannels, err := fetchChannels(authToken, StatusQueued)
if err != nil {
util.SendToSlackError("failed to fetch channels: %v", err)
break
}
interruptedByUser, err = syncChannels(queuedChannels, authToken, ytAPIKey, &syncCount)
if err != nil {
util.SendToSlackError("%v", err)
break
}
if interruptedByUser {
break
}
util.SendToSlackInfo("Finished syncing queued channels")
if syncUpdate {
//update synced channels
syncedChannels, err := fetchChannels(authToken, StatusSynced)
if err != nil {
util.SendToSlackError("failed to fetch channels: %v", err)
break
}
interruptedByUser, err = syncChannels(syncedChannels, authToken, ytAPIKey, &syncCount)
if err != nil {
util.SendToSlackError("%v", err)
break
}
if interruptedByUser {
break
}
util.SendToSlackInfo("Finished updating channels")
}
}
} else {
// sync whatever was specified
channelsToSync, err := fetchChannels(authToken, syncStatus) channelsToSync, err := fetchChannels(authToken, syncStatus)
if err != nil { if err != nil {
util.SendToSlackError("failed to fetch channels: %v", err) util.SendToSlackError("failed to fetch channels: %v", err)
return return
} }
interruptedByUser, err := syncChannels(channelsToSync, authToken, ytAPIKey, &syncCount)
if err != nil {
util.SendToSlackError("%v", err)
return
}
if interruptedByUser {
return
}
}
util.SendToSlackInfo("Syncing process terminated!")
}
// syncChannels processes a slice of youtube channels (channelsToSync) and returns a bool that indicates whether
// the execution finished by itself or was interrupted by the user and an error if anything happened
func syncChannels(channelsToSync []APIYoutubeChannel, authToken string, ytAPIKey string, syncCount *int) (bool, error) {
host, err := os.Hostname() host, err := os.Hostname()
if err != nil { if err != nil {
host = "" host = ""
} }
for ; *syncCount < len(channelsToSync) && (limit == 0 || *syncCount < limit); *syncCount++ {
for loops := 0; loops < len(channelsToSync) && (limit == 0 || loops < limit); loops++ { err = spaceCheck()
if err != nil {
return false, err
}
//avoid dereferencing //avoid dereferencing
channel := channelsToSync[loops] channel := channelsToSync[*syncCount]
channelID := channel.ChannelId channelID := channel.ChannelId
lbryChannelName := channel.DesiredChannelName lbryChannelName := channel.DesiredChannelName
if channel.TotalVideos < 1 { if channel.TotalVideos < 1 {
@ -176,7 +251,7 @@ func selfSync(cmd *cobra.Command, args []string) {
util.SendToSlackError("Failed acquiring sync rights for channel %s: %v", lbryChannelName, err) util.SendToSlackError("Failed acquiring sync rights for channel %s: %v", lbryChannelName, err)
continue continue
} }
util.SendToSlackInfo("Syncing %s to LBRY! (iteration %d)", lbryChannelName, loops) util.SendToSlackInfo("Syncing %s to LBRY! (iteration %d)", lbryChannelName, *syncCount)
s := sync.Sync{ s := sync.Sync{
YoutubeAPIKey: ytAPIKey, YoutubeAPIKey: ytAPIKey,
@ -198,30 +273,25 @@ func selfSync(cmd *cobra.Command, args []string) {
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR", "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
} }
if util.InSliceContains(err.Error(), fatalErrors) { if util.InSliceContains(err.Error(), fatalErrors) {
util.SendToSlackError("@Nikooo777 this requires manual intervention! Exiting...") return s.IsInterrupted(), errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
break
} }
//mark video as failed //mark video as failed
err := setChannelSyncStatus(authToken, channelID, StatusFailed) err := setChannelSyncStatus(authToken, channelID, StatusFailed)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s: %v", lbryChannelName, err) msg := fmt.Sprintf("Failed setting failed state for channel %s. \n@Nikooo777 this requires manual intervention! Exiting...", lbryChannelName)
util.SendToSlackError(msg) return s.IsInterrupted(), errors.Prefix(msg, err)
util.SendToSlackError("@Nikooo777 this requires manual intervention! Exiting...")
break
} }
continue continue
} }
if s.IsInterrupted() { if s.IsInterrupted() {
break return true, nil
} }
//mark video as synced //mark video as synced
err = setChannelSyncStatus(authToken, channelID, StatusSynced) err = setChannelSyncStatus(authToken, channelID, StatusSynced)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting synced state for channel %s: %v", lbryChannelName, err) msg := fmt.Sprintf("Failed setting failed state for channel %s. \n@Nikooo777 this requires manual intervention! Exiting...", lbryChannelName)
util.SendToSlackError(msg) return false, errors.Prefix(msg, err)
util.SendToSlackError("@Nikooo777 this requires manual intervention! Exiting...")
break
} }
} }
util.SendToSlackInfo("Syncing process terminated!") return false, nil
} }

View file

@ -210,10 +210,11 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
err = v.publish(daemon, claimAddress, amount, channelName) err = v.publish(daemon, claimAddress, amount, channelName)
//delete the video in all cases //delete the video in all cases
if v.delete() != nil { dErr := v.delete()
if dErr != nil {
// the video was published anyway so it should be marked as published // the video was published anyway so it should be marked as published
// for that to happen, no errors should be returned any further than here // for that to happen, no errors should be returned any further than here
log.Debugln(errors.Prefix("delete error", err)) log.Debugln(errors.Prefix("delete error", dErr))
} }
if err != nil { if err != nil {
return errors.Prefix("publish error", err) return errors.Prefix("publish error", err)