diff --git a/downloader/downloader.go b/downloader/downloader.go index 441497e..e233253 100644 --- a/downloader/downloader.go +++ b/downloader/downloader.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "net/url" "os/exec" @@ -15,14 +16,15 @@ import ( "github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/util" "github.com/sirupsen/logrus" ) -func GetPlaylistVideoIDs(channelName string, maxVideos int) ([]string, error) { +func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan) ([]string, error) { args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist"} - ids, err := run(args, false, true) + ids, err := run(args, false, true, stopChan) if err != nil { return nil, errors.Err(err) } @@ -36,48 +38,51 @@ func GetPlaylistVideoIDs(channelName string, maxVideos int) ([]string, error) { return videoIDs, nil } -func GetVideoInformation(videoID string) (*ytdl.YtdlVideo, error) { - //args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID} - //results, err := run(args, false, true) - //if err != nil { - // return nil, errors.Err(err) - //} +func GetVideoInformation(videoID string, stopChan stop.Chan, ip *net.TCPAddr) (*ytdl.YtdlVideo, error) { + args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID} + results, err := run(args, false, true, stopChan) + if err != nil { + return nil, errors.Err(err) + } var video *ytdl.YtdlVideo - //err = json.Unmarshal([]byte(results[0]), &video) - //if err != nil { - // return nil, errors.Err(err) - //} - - video = &ytdl.YtdlVideo{} + err = json.Unmarshal([]byte(results[0]), &video) + if err != nil { + return nil, errors.Err(err) + } // now get an accurate time const maxTries = 5 tries := 0 GetTime: tries++ - t, err := getUploadTime(videoID) + t, err := getUploadTime(videoID, ip) if err != nil { - slack(":warning: Upload time error: %v", err) + //slack(":warning: Upload time error: %v", err) if tries <= maxTries && (errors.Is(err, errNotScraped) || errors.Is(err, errUploadTimeEmpty)) { - triggerScrape(videoID) - time.Sleep(2 * time.Second) // let them scrape it - goto GetTime + err := triggerScrape(videoID, ip) + if err == nil { + time.Sleep(2 * time.Second) // let them scrape it + goto GetTime + } else { + //slack("triggering scrape returned error: %v", err) + } } else if !errors.Is(err, errNotScraped) && !errors.Is(err, errUploadTimeEmpty) { - slack(":warning: Error while trying to get accurate upload time for %s: %v", videoID, err) + //slack(":warning: Error while trying to get accurate upload time for %s: %v", videoID, err) return nil, errors.Err(err) } // do fallback below } - slack("After all that, upload time for %s is %s", videoID, t) + //slack("After all that, upload time for %s is %s", videoID, t) if t != "" { parsed, err := time.Parse("2006-01-02, 15:04:05 (MST)", t) // this will probably be UTC, but Go's timezone parsing is fucked up. it ignores the timezone in the date if err != nil { return nil, errors.Err(err) } + slack(":exclamation: Got an accurate time for %s", videoID) video.UploadDateForReal = parsed } else { - slack(":warning: Could not get accurate time for %s. Falling back to estimated time.", videoID) + //slack(":warning: Could not get accurate time for %s. Falling back to time from upload ytdl: %s.", videoID, video.UploadDate) // fall back to UploadDate from youtube-dl video.UploadDateForReal, err = time.Parse("20060102", video.UploadDate) if err != nil { @@ -96,30 +101,62 @@ func slack(format string, a ...interface{}) { util.SendToSlack(format, a...) } -func triggerScrape(videoID string) error { - slack("Triggering scrape for %s", videoID) +func triggerScrape(videoID string, ip *net.TCPAddr) error { + //slack("Triggering scrape for %s", videoID) u, err := url.Parse("https://caa.iti.gr/verify_videoV3") q := u.Query() q.Set("twtimeline", "0") q.Set("url", "https://www.youtube.com/watch?v="+videoID) u.RawQuery = q.Encode() - slack("GET %s", u.String()) - res, err := http.Get(u.String()) + //slack("GET %s", u.String()) + + client := getClient(ip) + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return errors.Err(err) + } + req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36") + + res, err := client.Do(req) if err != nil { return errors.Err(err) } defer res.Body.Close() - all, err := ioutil.ReadAll(res.Body) - spew.Dump(string(all), err) + var response struct { + Message string `json:"message"` + Status string `json:"status"` + VideoURL string `json:"video_url"` + } + err = json.NewDecoder(res.Body).Decode(&response) + if err != nil { + return errors.Err(err) + } + + switch response.Status { + case "removed_video": + return errors.Err("video previously removed from service") + case "no_video": + return errors.Err("they say 'video cannot be found'. wtf?") + default: + spew.Dump(response) + } return nil //https://caa.iti.gr/caa/api/v4/videos/reports/h-tuxHS5lSM } -func getUploadTime(videoID string) (string, error) { - slack("Getting upload time for %s", videoID) - res, err := http.Get("https://caa.iti.gr/get_verificationV3?url=https://www.youtube.com/watch?v=" + videoID) +func getUploadTime(videoID string, ip *net.TCPAddr) (string, error) { + //slack("Getting upload time for %s", videoID) + + client := getClient(ip) + req, err := http.NewRequest(http.MethodGet, "https://caa.iti.gr/get_verificationV3?url=https://www.youtube.com/watch?v="+videoID, nil) + if err != nil { + return "", errors.Err(err) + } + req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36") + + res, err := client.Do(req) if err != nil { return "", errors.Err(err) } @@ -139,6 +176,10 @@ func getUploadTime(videoID string) (string, error) { return "", errNotScraped } + if uploadTime.Status == "" && strings.HasPrefix(uploadTime.Message, "CANNOT_RETRIEVE_REPORT_FOR_VIDEO_") { + return "", errors.Err("cannot retrieve report for video") + } + if uploadTime.Time == "" { return "", errUploadTimeEmpty } @@ -146,7 +187,28 @@ func getUploadTime(videoID string) (string, error) { return uploadTime.Time, nil } -func run(args []string, withStdErr, withStdOut bool) ([]string, error) { +func getClient(ip *net.TCPAddr) *http.Client { + if ip == nil { + return http.DefaultClient + } + + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + LocalAddr: ip, + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } +} + +func run(args []string, withStdErr, withStdOut bool, stopChan stop.Chan) ([]string, error) { cmd := exec.Command("youtube-dl", args...) logrus.Printf("Running command youtube-dl %s", strings.Join(args, " ")) @@ -181,10 +243,23 @@ func run(args []string, withStdErr, withStdOut bool) ([]string, error) { return nil, errors.Err(err) } } - err := cmd.Wait() - if len(errorLog) > 0 { - return nil, errors.Err(err) + + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + select { + case <-stopChan: + if err := cmd.Process.Kill(); err != nil { + return nil, errors.Prefix("failed to kill command after stopper cancellation", err) + } + return nil, errors.Err("canceled by stopper") + case err := <-done: + if err != nil { + return nil, errors.Prefix("youtube-dl "+strings.Join(args, " "), err) + } } + if len(errorLog) > 0 { return nil, errors.Err(string(errorLog)) } diff --git a/downloader/downloader_test.go b/downloader/downloader_test.go index 1a80d6d..1f065e6 100644 --- a/downloader/downloader_test.go +++ b/downloader/downloader_test.go @@ -17,7 +17,7 @@ func TestGetPlaylistVideoIDs(t *testing.T) { } func TestGetVideoInformation(t *testing.T) { - video, err := GetVideoInformation("zj7pXM9gE5M") + video, err := GetVideoInformation("zj7pXM9gE5M", nil) if err != nil { logrus.Error(err) } diff --git a/ip_manager/throttle.go b/ip_manager/throttle.go index 43be077..e37ba69 100644 --- a/ip_manager/throttle.go +++ b/ip_manager/throttle.go @@ -63,21 +63,21 @@ func GetIPPool(stopGrp *stop.Group) (*IPPool, error) { lock: &sync.RWMutex{}, stopGrp: stopGrp, } - ticker := time.NewTicker(10 * time.Second) - go func() { - for { - select { - case <-stopGrp.Ch(): - return - case <-ticker.C: - ipPoolInstance.lock.RLock() - for _, ip := range ipPoolInstance.ips { - log.Debugf("IP: %s\tInUse: %t\tVideoID: %s\tThrottled: %t\tLastUse: %.1f", ip.IP, ip.InUse, ip.UsedForVideo, ip.Throttled, time.Since(ip.LastUse).Seconds()) - } - ipPoolInstance.lock.RUnlock() - } - } - }() + //ticker := time.NewTicker(10 * time.Second) + //go func() { + // for { + // select { + // case <-stopGrp.Ch(): + // return + // case <-ticker.C: + // ipPoolInstance.lock.RLock() + // for _, ip := range ipPoolInstance.ips { + // log.Debugf("IP: %s\tInUse: %t\tVideoID: %s\tThrottled: %t\tLastUse: %.1f", ip.IP, ip.InUse, ip.UsedForVideo, ip.Throttled, time.Since(ip.LastUse).Seconds()) + // } + // ipPoolInstance.lock.RUnlock() + // } + // } + //}() return ipPoolInstance, nil } @@ -108,7 +108,7 @@ func AllInUse(ips []throttledIP) bool { func (i *IPPool) ReleaseIP(ip string) { i.lock.Lock() defer i.lock.Unlock() - for j, _ := range i.ips { + for j := range i.ips { localIP := &i.ips[j] if localIP.IP == ip { localIP.InUse = false @@ -122,7 +122,7 @@ func (i *IPPool) ReleaseIP(ip string) { func (i *IPPool) ReleaseAll() { i.lock.Lock() defer i.lock.Unlock() - for j, _ := range i.ips { + for j := range i.ips { if i.ips[j].Throttled { continue } @@ -183,7 +183,7 @@ func (i *IPPool) nextIP(forVideo string) (*throttledIP, error) { } var nextIP *throttledIP - for j, _ := range i.ips { + for j := range i.ips { ip := &i.ips[j] if ip.InUse || ip.Throttled { continue diff --git a/main.go b/main.go index 6ff9112..b36233c 100644 --- a/main.go +++ b/main.go @@ -7,10 +7,8 @@ import ( "os" "time" - "github.com/davecgh/go-spew/spew" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/lbryio/ytsync/v5/downloader" "github.com/lbryio/ytsync/v5/manager" "github.com/lbryio/ytsync/v5/sdk" ytUtils "github.com/lbryio/ytsync/v5/util" @@ -40,8 +38,23 @@ var ( ) func main() { - spew.Dump(downloader.GetVideoInformation("oahaMa3XB0k")) - return + //grp := stop.New() + //ipPool, err := ip_manager.GetIPPool(grp) + //if err != nil { + // panic(err) + //} + // + //videoID := "vtIzMaLkCaM" + // + //ip, err := ipPool.GetIP(videoID) + //if err != nil { + // panic(err) + //} + // + //spew.Dump(ip) + // + //spew.Dump(downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)})) + //return rand.Seed(time.Now().UnixNano()) log.SetLevel(log.DebugLevel) diff --git a/manager/manager.go b/manager/manager.go index 50737af..2c84a9a 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -234,7 +234,7 @@ func (s *SyncManager) Start() error { } shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server") if !shouldNotCount { - logUtils.SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) + logUtils.SendInfoToSlack("A non fatal error was reported by the sync process.\n%s", errors.FullTrace(err)) } } err = blobs_reflector.ReflectAndClean() diff --git a/manager/ytsync.go b/manager/ytsync.go index 97eaaa0..c2f4450 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -253,6 +253,8 @@ func (s *Sync) FullCycle() (e error) { util.SendToSlack("got interrupt, shutting down") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") s.grp.Stop() + time.Sleep(5 * time.Second) + debug.PrintStack() // so we can figure out what's not stopping }() err := s.setStatusSyncing() if err != nil { @@ -854,6 +856,12 @@ func (s *Sync) startWorker(workerNum int) { tryCount := 0 for { + select { // check again inside the loop so this dies faster + case <-s.grp.Ch(): + log.Printf("Stopping worker %d", workerNum) + return + default: + } tryCount++ err := s.processVideo(v) @@ -988,10 +996,10 @@ func (s *Sync) enqueueYoutubeVideos() error { return err } - videos, err := ytapi.GetVideosToSync(s.APIConfig.YoutubeAPIKey, s.YoutubeChannelID, s.syncedVideos, s.Manager.SyncFlags.QuickSync, s.Manager.videosLimit, ytapi.VideoParams{ + videos, err := ytapi.GetVideosToSync(s.YoutubeChannelID, s.syncedVideos, s.Manager.SyncFlags.QuickSync, s.Manager.videosLimit, ytapi.VideoParams{ VideoDir: s.videoDirectory, S3Config: s.Manager.GetS3AWSConfig(), - Grp: s.grp, + Stopper: s.grp, IPPool: ipPool, }) if err != nil { diff --git a/ytapi/ytapi.go b/ytapi/ytapi.go index db354ec..c0e1b99 100644 --- a/ytapi/ytapi.go +++ b/ytapi/ytapi.go @@ -45,19 +45,19 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[ type VideoParams struct { VideoDir string S3Config aws.Config - Grp *stop.Group + Stopper *stop.Group IPPool *ip_manager.IPPool } var mostRecentlyFailedChannel string // TODO: fix this hack! -func GetVideosToSync(apiKey, channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams) ([]Video, error) { +func GetVideosToSync(channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams) ([]Video, error) { var videos []Video if quickSync { maxVideos = 50 } - videoIDs, err := downloader.GetPlaylistVideoIDs(channelID, maxVideos) + videoIDs, err := downloader.GetPlaylistVideoIDs(channelID, maxVideos, videoParams.Stopper.Ch()) if err != nil { return nil, errors.Err(err) } @@ -76,14 +76,14 @@ func GetVideosToSync(apiKey, channelID string, syncedVideos map[string]sdk.Synce mostRecentlyFailedChannel = channelID } - vids, err := getVideos(videoIDs) + vids, err := getVideos(videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool) if err != nil { return nil, err } for _, item := range vids { positionInList := playlistMap[item.ID] - videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.S3Config, videoParams.Grp, videoParams.IPPool) + videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool) if err != nil { return nil, errors.Err(err) } @@ -95,7 +95,7 @@ func GetVideosToSync(apiKey, channelID string, syncedVideos map[string]sdk.Synce continue } if _, ok := playlistMap[k]; !ok { - videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Grp, videoParams.IPPool)) + videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool)) } } @@ -143,6 +143,7 @@ func CountVideosInChannel(channelID string) (int, error) { func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.ChannelBrandingSettings, error) { return nil, nil, errors.Err("ChannelInfo doesn't work yet because we're focused on existing channels") + service, err := ytlib.New(&http.Client{Transport: &transport.APIKey{Key: apiKey}}) if err != nil { return nil, nil, errors.Prefix("error creating YouTube service", err) @@ -160,14 +161,27 @@ func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.Channe return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil } -func getVideos(videoIDs []string) ([]*ytdl.YtdlVideo, error) { +func getVideos(videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) { var videos []*ytdl.YtdlVideo for _, videoID := range videoIDs { - video, err := downloader.GetVideoInformation(videoID) + select { + case <-stopChan: + return videos, errors.Err("canceled by stopper") + default: + } + + //ip, err := ipPool.GetIP(videoID) + //if err != nil { + // return nil, err + //} + //video, err := downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)}) + video, err := downloader.GetVideoInformation(videoID, stopChan, nil) if err != nil { + //ipPool.ReleaseIP(ip) return nil, errors.Err(err) } videos = append(videos, video) + //ipPool.ReleaseIP(ip) } return videos, nil }