remove unused flag

always go through syncing first
don't sync videos shorter than 7 seconds
refactor code in video error handling
add interface to handle hard video failures (incomplete)
This commit is contained in:
Niko Storni 2020-11-03 21:41:39 +01:00
parent beade71aa6
commit fecf67118c
5 changed files with 108 additions and 95 deletions

View file

@ -42,7 +42,6 @@ func main() {
Args: cobra.RangeArgs(0, 0), Args: cobra.RangeArgs(0, 0),
} }
cmd.Flags().BoolVar(&cliFlags.StopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync") cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync")
@ -93,10 +92,6 @@ func ytSync(cmd *cobra.Command, args []string) {
return return
} }
if cliFlags.StopOnError && cliFlags.MaxTries != defaultMaxTries {
log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
return
}
if cliFlags.MaxTries < 1 { if cliFlags.MaxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense") log.Errorln("setting --max-tries less than 1 doesn't make sense")
return return

View file

@ -3,6 +3,7 @@ package manager
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -43,6 +44,9 @@ func (s *SyncManager) enqueueChannel(channel *shared.YoutubeChannel) {
DbChannelData: channel, DbChannelData: channel,
Manager: s, Manager: s,
namer: namer.NewNamer(), namer: namer.NewNamer(),
hardVideoFailure: hardVideoFailure{
lock: &sync.Mutex{},
},
}) })
} }
@ -78,7 +82,7 @@ func (s *SyncManager) Start() error {
} else { } else {
var queuesToSync []string var queuesToSync []string
if s.CliFlags.Status != "" { if s.CliFlags.Status != "" {
queuesToSync = append(queuesToSync, s.CliFlags.Status) queuesToSync = append(queuesToSync, shared.StatusSyncing, s.CliFlags.Status)
} else if s.CliFlags.SyncUpdate { } else if s.CliFlags.SyncUpdate {
queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusSynced) queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusSynced)
} else { } else {

View file

@ -53,6 +53,23 @@ type Sync struct {
walletMux *sync.RWMutex walletMux *sync.RWMutex
queue chan ytapi.Video queue chan ytapi.Video
defaultAccountID string defaultAccountID string
hardVideoFailure hardVideoFailure
}
type hardVideoFailure struct {
lock *sync.Mutex
failed bool
failureReason string
}
func (hv *hardVideoFailure) flagFailure(reason string) {
hv.lock.Lock()
defer hv.lock.Unlock()
if hv.failed {
return
}
hv.failed = true
hv.failureReason = reason
} }
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) {
@ -690,10 +707,6 @@ func (s *Sync) doSync() error {
} }
} }
if s.Manager.CliFlags.StopOnError {
log.Println("Will stop publishing if an error is detected")
}
for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ { for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ {
s.grp.Add(1) s.grp.Add(1)
go func(i int) { go func(i int) {
@ -712,48 +725,9 @@ func (s *Sync) doSync() error {
return err return err
} }
func (s *Sync) startWorker(workerNum int) { func (s *Sync) startWorker(workerNum int) error {
var v ytapi.Video var v ytapi.Video
var more bool var more bool
for {
select {
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
default:
}
select {
case v, more = <-s.queue:
if !more {
return
}
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
}
log.Println("================================================================================")
tryCount := 0
for {
select { // check again inside the loop so this dies faster
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
default:
}
tryCount++
err := s.processVideo(v)
if err != nil {
util.SendToSlack("Tried to process %s. Error: %v", v.ID(), err)
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error())
log.Errorln(logMsg)
if strings.Contains(strings.ToLower(err.Error()), "interrupted by user") {
return
}
fatalErrors := []string{ fatalErrors := []string{
":5279: read: connection reset by peer", ":5279: read: connection reset by peer",
"no space left on device", "no space left on device",
@ -765,9 +739,6 @@ func (s *Sync) startWorker(workerNum int) {
"You already have a stream claim published under the name", "You already have a stream claim published under the name",
"Missing inputs", "Missing inputs",
} }
if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.CliFlags.StopOnError {
s.grp.Stop()
} else if s.Manager.CliFlags.MaxTries > 1 {
errorsNoRetry := []string{ errorsNoRetry := []string{
"non 200 status code received", "non 200 status code received",
"This video contains content from", "This video contains content from",
@ -781,6 +752,7 @@ func (s *Sync) startWorker(workerNum int) {
"Client.Timeout exceeded while awaiting headers", "Client.Timeout exceeded while awaiting headers",
"the video is too big to sync, skipping for now", "the video is too big to sync, skipping for now",
"video is too long to process", "video is too long to process",
"video is too short to process",
"no compatible format available for this video", "no compatible format available for this video",
"Watch this video on YouTube.", "Watch this video on YouTube.",
"have blocked it on copyright grounds", "have blocked it on copyright grounds",
@ -795,13 +767,55 @@ func (s *Sync) startWorker(workerNum int) {
"This video is unavailable", "This video is unavailable",
"video is a live stream and hasn't completed yet", "video is a live stream and hasn't completed yet",
} }
if util.SubstringInSlice(err.Error(), errorsNoRetry) { walletErrors := []string{
log.Println("This error should not be retried at all") "Not enough funds to cover this transaction",
} else if tryCount < s.Manager.CliFlags.MaxTries { "failed: Not enough funds",
if util.SubstringInSlice(err.Error(), []string{ "Error in daemon: Insufficient funds, please deposit additional LBC",
//"Missing inputs",
}
blockchainErrors := []string{
"txn-mempool-conflict", "txn-mempool-conflict",
"too-long-mempool-chain", "too-long-mempool-chain",
}) { }
for {
select {
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return nil
default:
}
select {
case v, more = <-s.queue:
if !more {
return nil
}
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return nil
}
log.Println("================================================================================")
tryCount := 0
for {
select { // check again inside the loop so this dies faster
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return nil
default:
}
tryCount++
err := s.processVideo(v)
if err != nil {
logUtils.SendErrorToSlack("error processing video %s: %s", v.ID(), err.Error())
shouldRetry := s.Manager.CliFlags.MaxTries > 1 && !util.SubstringInSlice(err.Error(), errorsNoRetry) && tryCount < s.Manager.CliFlags.MaxTries
if strings.Contains(strings.ToLower(err.Error()), "interrupted by user") {
s.grp.Stop()
} else if util.SubstringInSlice(err.Error(), fatalErrors) {
s.grp.Stop()
} else if shouldRetry {
if util.SubstringInSlice(err.Error(), blockchainErrors) {
log.Println("waiting for a block before retrying") log.Println("waiting for a block before retrying")
err := s.waitForNewBlock() err := s.waitForNewBlock()
if err != nil { if err != nil {
@ -809,12 +823,7 @@ func (s *Sync) startWorker(workerNum int) {
logUtils.SendErrorToSlack("something went wrong while waiting for a block: %s", errors.FullTrace(err)) logUtils.SendErrorToSlack("something went wrong while waiting for a block: %s", errors.FullTrace(err))
break break
} }
} else if util.SubstringInSlice(err.Error(), []string{ } else if util.SubstringInSlice(err.Error(), walletErrors) {
"Not enough funds to cover this transaction",
"failed: Not enough funds",
"Error in daemon: Insufficient funds, please deposit additional LBC",
//"Missing inputs",
}) {
log.Println("checking funds and UTXOs before retrying...") log.Println("checking funds and UTXOs before retrying...")
err := s.walletSetup() err := s.walletSetup()
if err != nil { if err != nil {
@ -828,8 +837,8 @@ func (s *Sync) startWorker(workerNum int) {
log.Println("Retrying") log.Println("Retrying")
continue continue
} }
logUtils.SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) logUtils.SendErrorToSlack("Video %s failed after %d retries, skipping. Stack: %s", tryCount, v.ID(), errors.FullTrace(err))
}
s.syncedVideosMux.RLock() s.syncedVideosMux.RLock()
existingClaim, ok := s.syncedVideos[v.ID()] existingClaim, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.RUnlock() s.syncedVideosMux.RUnlock()

View file

@ -33,6 +33,7 @@ var NeverRetryFailures = []string{
"Unable to extract signature tokens", "Unable to extract signature tokens",
"the video is too big to sync, skipping for now", "the video is too big to sync, skipping for now",
"video is too long to process", "video is too long to process",
"video is too short to process",
"This video contains content from", "This video contains content from",
"no compatible format available for this video", "no compatible format available for this video",
"Watch this video on YouTube.", "Watch this video on YouTube.",
@ -42,7 +43,6 @@ var NeverRetryFailures = []string{
} }
type SyncFlags struct { type SyncFlags struct {
StopOnError bool
TakeOverExistingChannel bool TakeOverExistingChannel bool
SkipSpaceCheck bool SkipSpaceCheck bool
SyncUpdate bool SyncUpdate bool

View file

@ -471,11 +471,16 @@ func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncPar
var err error var err error
dur := time.Duration(v.youtubeInfo.Duration) * time.Second dur := time.Duration(v.youtubeInfo.Duration) * time.Second
minDuration := 7 * time.Second
if dur > v.maxVideoLength { if dur > v.maxVideoLength {
logUtils.SendErrorToSlack("%s is %s long and the limit is %s", v.id, dur.String(), v.maxVideoLength.String()) logUtils.SendErrorToSlack("%s is %s long and the limit is %s", v.id, dur.String(), v.maxVideoLength.String())
return nil, errors.Err("video is too long to process") return nil, errors.Err("video is too long to process")
} }
if dur < minDuration {
logUtils.SendErrorToSlack("%s is %s long and the minimum is %s", v.id, dur.String(), minDuration.String())
return nil, errors.Err("video is too short to process")
}
for { for {
err = v.download() err = v.download()
if err != nil && strings.Contains(err.Error(), "HTTP Error 429") { if err != nil && strings.Contains(err.Error(), "HTTP Error 429") {