Mark video failed if we cannot get video info instead of failing the sync

Turn on error log after fixing lockup
Try different user agents if calls fail
Prevent double processing a channel in a row, just fail the sync instead
This commit is contained in:
Mark Beamer Jr 2020-08-03 01:05:03 -04:00 committed by Alex Grintsvayg
parent fc18151d77
commit 5be3551abe
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
3 changed files with 69 additions and 32 deletions

View file

@ -26,12 +26,13 @@ import (
func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) { func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) {
args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist"} args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist"}
ids, err := run(channelName, args, false, true, stopChan, pool) ids, err := run(channelName, args, true, true, stopChan, pool)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
videoIDs := make([]string, maxVideos) videoIDs := make([]string, maxVideos)
for i, v := range ids { for i, v := range ids {
logrus.Debugf("%d - video id %s", i, v)
if i >= maxVideos { if i >= maxVideos {
break break
} }
@ -44,7 +45,7 @@ const releaseTimeFormat = "2006-01-02, 15:04:05 (MST)"
func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Chan, ip *net.TCPAddr, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) { func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Chan, ip *net.TCPAddr, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) {
args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID} args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID}
results, err := run(videoID, args, false, true, stopChan, pool) results, err := run(videoID, args, true, true, stopChan, pool)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
@ -239,29 +240,15 @@ func getClient(ip *net.TCPAddr) *http.Client {
func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) { func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) {
var maxtries = 10 var maxtries = 10
var attemps int var attemps int
var useragent []string
for { for {
var sourceAddress string sourceAddress, err := getIPFromPool(use, stopChan, pool)
var err error if err != nil {
for { return nil, err
sourceAddress, err = pool.GetIP(use)
if err != nil {
if errors.Is(err, ip_manager.ErrAllThrottled) {
select {
case <-stopChan:
return nil, errors.Err("interrupted by user")
default:
time.Sleep(ip_manager.IPCooldownPeriod)
continue
}
} else {
return nil, err
}
}
break
} }
defer pool.ReleaseIP(sourceAddress) defer pool.ReleaseIP(sourceAddress)
argsForCommand := append(args, "--source-address", sourceAddress) argsForCommand := append(args, "--source-address", sourceAddress)
//argsForCommand = append(args, "--user-agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)") argsForCommand = append(argsForCommand, useragent...)
cmd := exec.Command("youtube-dl", argsForCommand...) cmd := exec.Command("youtube-dl", argsForCommand...)
logrus.Printf("Running command youtube-dl %s", strings.Join(argsForCommand, " ")) logrus.Printf("Running command youtube-dl %s", strings.Join(argsForCommand, " "))
@ -273,10 +260,6 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
errorLog, err = ioutil.ReadAll(stderr)
if err != nil {
return nil, errors.Err(err)
}
} }
var stdout io.ReadCloser var stdout io.ReadCloser
@ -295,6 +278,12 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
if withStdErr {
errorLog, err = ioutil.ReadAll(stderr)
if err != nil {
return nil, errors.Err(err)
}
}
} }
done := make(chan error, 1) done := make(chan error, 1)
@ -316,13 +305,21 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C
logrus.Debugf("known throttling error...try again (%d)", attemps) logrus.Debugf("known throttling error...try again (%d)", attemps)
continue continue
} }
if strings.Contains(string(errorLog), "YouTube said: Unable to extract video data") {
useragent = []string{"--user-agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"}
if attemps == 1 {
useragent = []string{"--user-agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"}
}
logrus.Debugf("known extraction issue, maybe user agent specification will work...try again (%d)", attemps)
continue
}
if attemps > maxtries { if attemps > maxtries {
logrus.Debug("too many tries returning failure") logrus.Debug("too many tries returning failure")
break break
} }
} }
logrus.Debugf("Unkown error, returning failure: %s", err.Error()) logrus.Debugf("Unkown error, returning failure: %s", err.Error())
return nil, errors.Prefix("youtube-dl "+strings.Join(argsForCommand, " "), err) return nil, errors.Prefix("youtube-dl "+strings.Join(argsForCommand, " ")+" ["+string(errorLog)+"] ", err)
} }
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
} }
@ -332,3 +329,25 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C
} }
} }
} }
func getIPFromPool(use string, stopChan stop.Chan, pool *ip_manager.IPPool) (sourceAddress string, err error) {
for {
sourceAddress, err = pool.GetIP(use)
if err != nil {
if errors.Is(err, ip_manager.ErrAllThrottled) {
select {
case <-stopChan:
return "", errors.Err("interrupted by user")
default:
time.Sleep(ip_manager.IPCooldownPeriod)
continue
}
} else {
return "", err
}
}
break
}
return
}

View file

@ -105,6 +105,7 @@ func (s *SyncManager) Start() error {
} }
} }
var lastChannelProcessed string
syncCount := 0 syncCount := 0
for { for {
err := s.checkUsedSpace() err := s.checkUsedSpace()
@ -205,6 +206,13 @@ func (s *SyncManager) Start() error {
time.Sleep(5 * time.Minute) time.Sleep(5 * time.Minute)
} }
for _, sync := range syncs { for _, sync := range syncs {
if lastChannelProcessed == sync.LbryChannelName {
util.SendToSlack("We just killed a sync for %s to stop looping!(%s)", sync.LbryChannelName, sync.YoutubeChannelID)
stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.LbryChannelName)
sync.setChannelTerminationStatus(&stopTheLoops)
continue
}
lastChannelProcessed = sync.LbryChannelName
shouldNotCount := false shouldNotCount := false
logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
err := sync.FullCycle() err := sync.FullCycle()

View file

@ -10,6 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/ytsync/v5/downloader" "github.com/lbryio/ytsync/v5/downloader"
@ -76,7 +78,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s
mostRecentlyFailedChannel = channelID mostRecentlyFailedChannel = channelID
} }
vids, err := getVideos(config, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool) vids, err := getVideos(config, channelID, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -161,7 +163,7 @@ func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.Channe
return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil
} }
func getVideos(config *sdk.APIConfig, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) { func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) {
var videos []*ytdl.YtdlVideo var videos []*ytdl.YtdlVideo
for _, videoID := range videoIDs { for _, videoID := range videoIDs {
if len(videoID) < 5 { if len(videoID) < 5 {
@ -187,11 +189,19 @@ func getVideos(config *sdk.APIConfig, videoIDs []string, stopChan stop.Chan, ipP
} }
video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool) video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool)
if err != nil { if err != nil {
//ipPool.ReleaseIP(ip) errSDK := config.MarkVideoStatus(sdk.VideoStatus{
return nil, errors.Err(err) ChannelID: channelID,
VideoID: videoID,
Status: "failed",
FailureReason: err.Error(),
})
util.SendErrorToSlack("Skipping video: " + err.Error())
if errSDK != nil {
return nil, errors.Err(errSDK)
}
} else {
videos = append(videos, video)
} }
videos = append(videos, video)
//ipPool.ReleaseIP(ip)
} }
return videos, nil return videos, nil
} }