From 5be3551abee2cec6db15b096546aba2c87a5cf7d Mon Sep 17 00:00:00 2001 From: Mark Beamer Jr Date: Mon, 3 Aug 2020 01:05:03 -0400 Subject: [PATCH] Mark video failed if we cannot get video info instead of failing the sync Turn on error log after fixing lockup Try different user agents if calls fail Prevent double processing a channel in a row, just fail the sync instead --- downloader/downloader.go | 71 +++++++++++++++++++++++++--------------- manager/manager.go | 8 +++++ ytapi/ytapi.go | 22 +++++++++---- 3 files changed, 69 insertions(+), 32 deletions(-) diff --git a/downloader/downloader.go b/downloader/downloader.go index 7ad4155..eba0c1a 100644 --- a/downloader/downloader.go +++ b/downloader/downloader.go @@ -26,12 +26,13 @@ import ( func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) { args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist"} - ids, err := run(channelName, args, false, true, stopChan, pool) + ids, err := run(channelName, args, true, true, stopChan, pool) if err != nil { return nil, errors.Err(err) } videoIDs := make([]string, maxVideos) for i, v := range ids { + logrus.Debugf("%d - video id %s", i, v) if i >= maxVideos { break } @@ -44,7 +45,7 @@ const releaseTimeFormat = "2006-01-02, 15:04:05 (MST)" func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Chan, ip *net.TCPAddr, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) { args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID} - results, err := run(videoID, args, false, true, stopChan, pool) + results, err := run(videoID, args, true, true, stopChan, pool) if err != nil { return nil, errors.Err(err) } @@ -239,29 +240,15 @@ func getClient(ip *net.TCPAddr) *http.Client { func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) { var maxtries = 10 var attemps int + var useragent []string for { - var sourceAddress string - var err error - for { - sourceAddress, err = pool.GetIP(use) - if err != nil { - if errors.Is(err, ip_manager.ErrAllThrottled) { - select { - case <-stopChan: - return nil, errors.Err("interrupted by user") - default: - time.Sleep(ip_manager.IPCooldownPeriod) - continue - } - } else { - return nil, err - } - } - break + sourceAddress, err := getIPFromPool(use, stopChan, pool) + if err != nil { + return nil, err } defer pool.ReleaseIP(sourceAddress) argsForCommand := append(args, "--source-address", sourceAddress) - //argsForCommand = append(args, "--user-agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)") + argsForCommand = append(argsForCommand, useragent...) cmd := exec.Command("youtube-dl", argsForCommand...) logrus.Printf("Running command youtube-dl %s", strings.Join(argsForCommand, " ")) @@ -273,10 +260,6 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C if err != nil { return nil, errors.Err(err) } - errorLog, err = ioutil.ReadAll(stderr) - if err != nil { - return nil, errors.Err(err) - } } var stdout io.ReadCloser @@ -295,6 +278,12 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C if err != nil { return nil, errors.Err(err) } + if withStdErr { + errorLog, err = ioutil.ReadAll(stderr) + if err != nil { + return nil, errors.Err(err) + } + } } done := make(chan error, 1) @@ -316,13 +305,21 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C logrus.Debugf("known throttling error...try again (%d)", attemps) continue } + if strings.Contains(string(errorLog), "YouTube said: Unable to extract video data") { + useragent = []string{"--user-agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"} + if attemps == 1 { + useragent = []string{"--user-agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"} + } + logrus.Debugf("known extraction issue, maybe user agent specification will work...try again (%d)", attemps) + continue + } if attemps > maxtries { logrus.Debug("too many tries returning failure") break } } logrus.Debugf("Unkown error, returning failure: %s", err.Error()) - return nil, errors.Prefix("youtube-dl "+strings.Join(argsForCommand, " "), err) + return nil, errors.Prefix("youtube-dl "+strings.Join(argsForCommand, " ")+" ["+string(errorLog)+"] ", err) } return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil } @@ -332,3 +329,25 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C } } } + +func getIPFromPool(use string, stopChan stop.Chan, pool *ip_manager.IPPool) (sourceAddress string, err error) { + for { + sourceAddress, err = pool.GetIP(use) + if err != nil { + if errors.Is(err, ip_manager.ErrAllThrottled) { + select { + case <-stopChan: + return "", errors.Err("interrupted by user") + + default: + time.Sleep(ip_manager.IPCooldownPeriod) + continue + } + } else { + return "", err + } + } + break + } + return +} diff --git a/manager/manager.go b/manager/manager.go index 2c84a9a..ac24ad7 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -105,6 +105,7 @@ func (s *SyncManager) Start() error { } } + var lastChannelProcessed string syncCount := 0 for { err := s.checkUsedSpace() @@ -205,6 +206,13 @@ func (s *SyncManager) Start() error { time.Sleep(5 * time.Minute) } for _, sync := range syncs { + if lastChannelProcessed == sync.LbryChannelName { + util.SendToSlack("We just killed a sync for %s to stop looping!(%s)", sync.LbryChannelName, sync.YoutubeChannelID) + stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.LbryChannelName) + sync.setChannelTerminationStatus(&stopTheLoops) + continue + } + lastChannelProcessed = sync.LbryChannelName shouldNotCount := false logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) err := sync.FullCycle() diff --git a/ytapi/ytapi.go b/ytapi/ytapi.go index 718f515..6b98406 100644 --- a/ytapi/ytapi.go +++ b/ytapi/ytapi.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/lbryio/ytsync/v5/util" + "github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/downloader" @@ -76,7 +78,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s mostRecentlyFailedChannel = channelID } - vids, err := getVideos(config, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool) + vids, err := getVideos(config, channelID, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool) if err != nil { return nil, err } @@ -161,7 +163,7 @@ func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.Channe return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil } -func getVideos(config *sdk.APIConfig, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) { +func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) { var videos []*ytdl.YtdlVideo for _, videoID := range videoIDs { if len(videoID) < 5 { @@ -187,11 +189,19 @@ func getVideos(config *sdk.APIConfig, videoIDs []string, stopChan stop.Chan, ipP } video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool) if err != nil { - //ipPool.ReleaseIP(ip) - return nil, errors.Err(err) + errSDK := config.MarkVideoStatus(sdk.VideoStatus{ + ChannelID: channelID, + VideoID: videoID, + Status: "failed", + FailureReason: err.Error(), + }) + util.SendErrorToSlack("Skipping video: " + err.Error()) + if errSDK != nil { + return nil, errors.Err(errSDK) + } + } else { + videos = append(videos, video) } - videos = append(videos, video) - //ipPool.ReleaseIP(ip) } return videos, nil }