added flags to retry errors and stop on errors

This commit is contained in:
Alex Grintsvayg 2017-10-04 10:31:03 -04:00
parent c25b5d780c
commit af3ab410cb
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
3 changed files with 57 additions and 7 deletions

View file

@ -12,6 +12,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
@ -31,6 +32,8 @@ func init() {
Short: "Publish youtube channel into LBRY network.", Short: "Publish youtube channel into LBRY network.",
Run: ytsync, Run: ytsync,
} }
ytSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a video fails, stop publishing")
ytSyncCmd.Flags().BoolVar(&retryErrors, "retry-errors", false, "Retry failed publishes")
RootCmd.AddCommand(ytSyncCmd) RootCmd.AddCommand(ytSyncCmd)
} }
@ -71,6 +74,8 @@ var (
ytAPIKey string ytAPIKey string
channelID string channelID string
lbryChannelName string lbryChannelName string
stopOnError bool
retryErrors bool
daemon *jsonrpc.Client daemon *jsonrpc.Client
claimAddress string claimAddress string
@ -87,6 +92,11 @@ func ytsync(cmd *cobra.Command, args []string) {
lbryChannelName = args[2] lbryChannelName = args[2]
} }
if stopOnError && retryErrors {
log.Errorln("--stop-on-error and --retry-errors are mutually exclusive")
return
}
redisPool = &redis.Pool{ redisPool = &redis.Pool{
MaxIdle: 3, MaxIdle: 3,
IdleTimeout: 5 * time.Minute, IdleTimeout: 5 * time.Minute,
@ -103,6 +113,15 @@ func ytsync(cmd *cobra.Command, args []string) {
var wg sync.WaitGroup var wg sync.WaitGroup
videoQueue := make(chan video) videoQueue := make(chan video)
stopEnqueuing := make(chan struct{})
sendStopEnqueuing := sync.Once{}
var videoErrored atomic.Value
videoErrored.Store(false)
if stopOnError {
log.Println("Will stop publishing if an error is detected")
}
daemon = jsonrpc.NewClient("") daemon = jsonrpc.NewClient("")
videoDirectory, err = ioutil.TempDir("", "ytsync") videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil { if err != nil {
@ -137,15 +156,34 @@ func ytsync(cmd *cobra.Command, args []string) {
if !more { if !more {
return return
} }
err := processVideo(v) if stopOnError && videoErrored.Load().(bool) {
if err != nil { log.Println("Video errored. Exiting")
log.Errorln("error processing video: " + err.Error()) return
}
for {
err := processVideo(v)
if err != nil {
log.Errorln("error processing video: " + err.Error())
if stopOnError {
videoErrored.Store(true)
sendStopEnqueuing.Do(func() {
stopEnqueuing <- struct{}{}
})
}
}
if err != nil && retryErrors {
log.Println("Retrying")
} else {
break
}
} }
} }
}() }()
} }
err = enqueueVideosFromChannel(channelID, &videoQueue) err = enqueueVideosFromChannel(channelID, &videoQueue, &stopEnqueuing)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -192,7 +230,7 @@ func ensureChannelOwnership() error {
return nil return nil
} }
func enqueueVideosFromChannel(channelID string, videoChan *chan video) error { func enqueueVideosFromChannel(channelID string, videoChan *chan video, stopEnqueuing *chan struct{}) error {
client := &http.Client{ client := &http.Client{
Transport: &transport.APIKey{Key: ytAPIKey}, Transport: &transport.APIKey{Key: ytAPIKey},
} }
@ -270,14 +308,18 @@ func enqueueVideosFromChannel(channelID string, videoChan *chan video) error {
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos))) //or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
for _, v := range videos { for _, v := range videos {
*videoChan <- v select {
case *videoChan <- v:
case <-*stopEnqueuing:
return nil
}
} }
return nil return nil
} }
func processVideo(v video) error { func processVideo(v video) error {
log.Println("Processing " + v.id) log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)")
conn := redisPool.Get() conn := redisPool.Get()
defer conn.Close() defer conn.Close()

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"sort"
"strconv" "strconv"
"strings" "strings"
@ -60,6 +61,7 @@ func debugParams(params map[string]interface{}) string {
} }
s = append(s, fmt.Sprintf("%s=%+v", k, v)) s = append(s, fmt.Sprintf("%s=%+v", k, v))
} }
sort.Strings(s)
return strings.Join(s, " ") return strings.Join(s, " ")
} }

View file

@ -113,6 +113,12 @@ func fixDecodeProto(src, dest reflect.Type, data interface{}) (interface{}, erro
return nil, err return nil, err
} }
return decimal.NewFromFloat(val), nil return decimal.NewFromFloat(val), nil
} else if s, ok := data.(string); ok {
d, err := decimal.NewFromString(s)
if err != nil {
return nil, err
}
return d, nil
} }
case reflect.TypeOf(lbryschema.Metadata_Version(0)): case reflect.TypeOf(lbryschema.Metadata_Version(0)):