diff --git a/downloader/downloader.go b/downloader/downloader.go index 7ad4155..eba0c1a 100644 --- a/downloader/downloader.go +++ b/downloader/downloader.go @@ -26,12 +26,13 @@ import ( func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) { args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist"} - ids, err := run(channelName, args, false, true, stopChan, pool) + ids, err := run(channelName, args, true, true, stopChan, pool) if err != nil { return nil, errors.Err(err) } videoIDs := make([]string, maxVideos) for i, v := range ids { + logrus.Debugf("%d - video id %s", i, v) if i >= maxVideos { break } @@ -44,7 +45,7 @@ const releaseTimeFormat = "2006-01-02, 15:04:05 (MST)" func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Chan, ip *net.TCPAddr, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) { args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID} - results, err := run(videoID, args, false, true, stopChan, pool) + results, err := run(videoID, args, true, true, stopChan, pool) if err != nil { return nil, errors.Err(err) } @@ -239,29 +240,15 @@ func getClient(ip *net.TCPAddr) *http.Client { func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) { var maxtries = 10 var attemps int + var useragent []string for { - var sourceAddress string - var err error - for { - sourceAddress, err = pool.GetIP(use) - if err != nil { - if errors.Is(err, ip_manager.ErrAllThrottled) { - select { - case <-stopChan: - return nil, errors.Err("interrupted by user") - default: - time.Sleep(ip_manager.IPCooldownPeriod) - continue - } - } else { - return nil, err - } - } - break + sourceAddress, err := getIPFromPool(use, stopChan, pool) + if err != nil { + return nil, err } defer pool.ReleaseIP(sourceAddress) argsForCommand := append(args, "--source-address", sourceAddress) - //argsForCommand = append(args, "--user-agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)") + argsForCommand = append(argsForCommand, useragent...) cmd := exec.Command("youtube-dl", argsForCommand...) logrus.Printf("Running command youtube-dl %s", strings.Join(argsForCommand, " ")) @@ -273,10 +260,6 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C if err != nil { return nil, errors.Err(err) } - errorLog, err = ioutil.ReadAll(stderr) - if err != nil { - return nil, errors.Err(err) - } } var stdout io.ReadCloser @@ -295,6 +278,12 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C if err != nil { return nil, errors.Err(err) } + if withStdErr { + errorLog, err = ioutil.ReadAll(stderr) + if err != nil { + return nil, errors.Err(err) + } + } } done := make(chan error, 1) @@ -316,13 +305,21 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C logrus.Debugf("known throttling error...try again (%d)", attemps) continue } + if strings.Contains(string(errorLog), "YouTube said: Unable to extract video data") { + useragent = []string{"--user-agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"} + if attemps == 1 { + useragent = []string{"--user-agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"} + } + logrus.Debugf("known extraction issue, maybe user agent specification will work...try again (%d)", attemps) + continue + } if attemps > maxtries { logrus.Debug("too many tries returning failure") break } } logrus.Debugf("Unkown error, returning failure: %s", err.Error()) - return nil, errors.Prefix("youtube-dl "+strings.Join(argsForCommand, " "), err) + return nil, errors.Prefix("youtube-dl "+strings.Join(argsForCommand, " ")+" ["+string(errorLog)+"] ", err) } return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil } @@ -332,3 +329,25 @@ func run(use string, args []string, withStdErr, withStdOut bool, stopChan stop.C } } } + +func getIPFromPool(use string, stopChan stop.Chan, pool *ip_manager.IPPool) (sourceAddress string, err error) { + for { + sourceAddress, err = pool.GetIP(use) + if err != nil { + if errors.Is(err, ip_manager.ErrAllThrottled) { + select { + case <-stopChan: + return "", errors.Err("interrupted by user") + + default: + time.Sleep(ip_manager.IPCooldownPeriod) + continue + } + } else { + return "", err + } + } + break + } + return +} diff --git a/manager/manager.go b/manager/manager.go index 2c84a9a..ac24ad7 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -105,6 +105,7 @@ func (s *SyncManager) Start() error { } } + var lastChannelProcessed string syncCount := 0 for { err := s.checkUsedSpace() @@ -205,6 +206,13 @@ func (s *SyncManager) Start() error { time.Sleep(5 * time.Minute) } for _, sync := range syncs { + if lastChannelProcessed == sync.LbryChannelName { + util.SendToSlack("We just killed a sync for %s to stop looping!(%s)", sync.LbryChannelName, sync.YoutubeChannelID) + stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.LbryChannelName) + sync.setChannelTerminationStatus(&stopTheLoops) + continue + } + lastChannelProcessed = sync.LbryChannelName shouldNotCount := false logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) err := sync.FullCycle() diff --git a/ytapi/ytapi.go b/ytapi/ytapi.go index 718f515..6b98406 100644 --- a/ytapi/ytapi.go +++ b/ytapi/ytapi.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/lbryio/ytsync/v5/util" + "github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/downloader" @@ -76,7 +78,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s mostRecentlyFailedChannel = channelID } - vids, err := getVideos(config, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool) + vids, err := getVideos(config, channelID, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool) if err != nil { return nil, err } @@ -161,7 +163,7 @@ func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.Channe return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil } -func getVideos(config *sdk.APIConfig, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) { +func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) { var videos []*ytdl.YtdlVideo for _, videoID := range videoIDs { if len(videoID) < 5 { @@ -187,11 +189,19 @@ func getVideos(config *sdk.APIConfig, videoIDs []string, stopChan stop.Chan, ipP } video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool) if err != nil { - //ipPool.ReleaseIP(ip) - return nil, errors.Err(err) + errSDK := config.MarkVideoStatus(sdk.VideoStatus{ + ChannelID: channelID, + VideoID: videoID, + Status: "failed", + FailureReason: err.Error(), + }) + util.SendErrorToSlack("Skipping video: " + err.Error()) + if errSDK != nil { + return nil, errors.Err(errSDK) + } + } else { + videos = append(videos, video) } - videos = append(videos, video) - //ipPool.ReleaseIP(ip) } return videos, nil }