ytsync/downloader/downloader.go

398 lines
11 KiB
Go
Raw Normal View History

package downloader
import (
2020-07-27 23:14:06 +02:00
"encoding/json"
"fmt"
"io/ioutil"
2021-03-01 23:34:19 +01:00
"math"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"strings"
"time"
"github.com/davecgh/go-spew/spew"
2020-07-27 23:14:06 +02:00
"github.com/lbryio/ytsync/v5/downloader/ytdl"
2020-07-30 18:48:05 +02:00
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/sdk"
2021-06-18 03:09:19 +02:00
"github.com/lbryio/ytsync/v5/shared"
util2 "github.com/lbryio/ytsync/v5/util"
2020-07-27 23:14:06 +02:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/sirupsen/logrus"
)
2020-07-30 18:48:05 +02:00
func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) {
2021-06-25 19:16:01 +02:00
args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName + "/videos", "--get-id", "--flat-playlist", "--cookies", "cookies.txt", "--playlist-end", fmt.Sprintf("%d", maxVideos)}
2021-06-18 03:09:19 +02:00
ids, err := run(channelName, args, stopChan, pool)
2020-07-27 21:57:19 +02:00
if err != nil {
return nil, errors.Err(err)
}
2020-09-25 21:44:14 +02:00
videoIDs := make([]string, 0, maxVideos)
2020-07-27 21:57:19 +02:00
for i, v := range ids {
if v == "" {
continue
}
logrus.Debugf("%d - video id %s", i, v)
2020-07-27 21:57:19 +02:00
if i >= maxVideos {
break
}
2020-09-25 21:44:14 +02:00
videoIDs = append(videoIDs, v)
2020-07-27 21:57:19 +02:00
}
return videoIDs, nil
}
const releaseTimeFormat = "2006-01-02, 15:04:05 (MST)"
2020-07-30 18:48:05 +02:00
func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Chan, ip *net.TCPAddr, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) {
args := []string{
"--skip-download",
"--write-info-json",
fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID),
"--cookies",
"cookies.txt",
"-o",
path.Join(util2.GetVideoMetadataDir(), videoID),
}
2021-06-18 03:09:19 +02:00
_, err := run(videoID, args, stopChan, pool)
if err != nil {
return nil, errors.Err(err)
}
f, err := os.Open(path.Join(util2.GetVideoMetadataDir(), videoID+".info.json"))
if err != nil {
return nil, errors.Err(err)
}
// defer the closing of our jsonFile so that we can parse it later on
defer f.Close()
// read our opened jsonFile as a byte array.
byteValue, _ := ioutil.ReadAll(f)
2020-07-27 23:14:06 +02:00
var video *ytdl.YtdlVideo
err = json.Unmarshal(byteValue, &video)
if err != nil {
return nil, errors.Err(err)
}
// now get an accurate time
const maxTries = 5
tries := 0
GetTime:
tries++
t, err := getUploadTime(config, videoID, ip, video.UploadDate)
if err != nil {
//slack(":warning: Upload time error: %v", err)
if tries <= maxTries && (errors.Is(err, errNotScraped) || errors.Is(err, errUploadTimeEmpty) || errors.Is(err, errStatusParse) || errors.Is(err, errConnectionIssue)) {
err := triggerScrape(videoID, ip)
if err == nil {
time.Sleep(2 * time.Second) // let them scrape it
goto GetTime
} else {
//slack("triggering scrape returned error: %v", err)
}
} else if !errors.Is(err, errNotScraped) && !errors.Is(err, errUploadTimeEmpty) {
//slack(":warning: Error while trying to get accurate upload time for %s: %v", videoID, err)
2020-08-06 02:12:05 +02:00
if t == "" {
return nil, errors.Err(err)
} else {
t = "" //TODO: get rid of the other piece below?
}
}
// do fallback below
}
//slack("After all that, upload time for %s is %s", videoID, t)
if t != "" {
parsed, err := time.Parse("2006-01-02, 15:04:05 (MST)", t) // this will probably be UTC, but Go's timezone parsing is fucked up. it ignores the timezone in the date
if err != nil {
return nil, errors.Err(err)
}
//slack(":exclamation: Got an accurate time for %s", videoID)
video.UploadDateForReal = parsed
2020-08-06 02:12:05 +02:00
} else { //TODO: this is the piece that isn't needed!
slack(":warning: Could not get accurate time for %s. Falling back to time from upload ytdl: %s.", videoID, video.UploadDate)
// fall back to UploadDate from youtube-dl
video.UploadDateForReal, err = time.Parse("20060102", video.UploadDate)
if err != nil {
return nil, err
}
}
2020-07-27 23:14:06 +02:00
return video, nil
}
var errNotScraped = errors.Base("not yet scraped by caa.iti.gr")
var errUploadTimeEmpty = errors.Base("upload time is empty")
var errStatusParse = errors.Base("could not parse status, got number, need string")
var errConnectionIssue = errors.Base("there was a connection issue with the api")
func slack(format string, a ...interface{}) {
fmt.Printf(format+"\n", a...)
util.SendToSlack(format, a...)
}
func triggerScrape(videoID string, ip *net.TCPAddr) error {
//slack("Triggering scrape for %s", videoID)
u, err := url.Parse("https://caa.iti.gr/verify_videoV3")
q := u.Query()
q.Set("twtimeline", "0")
q.Set("url", "https://www.youtube.com/watch?v="+videoID)
u.RawQuery = q.Encode()
//slack("GET %s", u.String())
client := getClient(ip)
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return errors.Err(err)
}
req.Header.Set("User-Agent", ChromeUA)
res, err := client.Do(req)
if err != nil {
return errors.Err(err)
}
defer res.Body.Close()
var response struct {
Message string `json:"message"`
Status string `json:"status"`
VideoURL string `json:"video_url"`
}
err = json.NewDecoder(res.Body).Decode(&response)
if err != nil {
if strings.Contains(err.Error(), "cannot unmarshal number") {
return errors.Err(errStatusParse)
}
if strings.Contains(err.Error(), "no route to host") {
return errors.Err(errConnectionIssue)
}
return errors.Err(err)
}
switch response.Status {
case "removed_video":
return errors.Err("video previously removed from service")
case "no_video":
return errors.Err("they say 'video cannot be found'. wtf?")
default:
spew.Dump(response)
}
return nil
//https://caa.iti.gr/caa/api/v4/videos/reports/h-tuxHS5lSM
}
func getUploadTime(config *sdk.APIConfig, videoID string, ip *net.TCPAddr, uploadDate string) (string, error) {
//slack("Getting upload time for %s", videoID)
release, err := config.GetReleasedDate(videoID)
if err != nil {
2020-08-06 00:25:54 +02:00
logrus.Error(err)
}
2020-08-18 00:03:38 +02:00
ytdlUploadDate, err := time.Parse("20060102", uploadDate)
if err != nil {
logrus.Error(err)
}
2020-08-06 00:25:54 +02:00
if release != nil {
2020-08-06 02:12:05 +02:00
//const sqlTimeFormat = "2006-01-02 15:04:05"
sqlTime, err := time.ParseInLocation(time.RFC3339, release.ReleaseTime, time.UTC)
2020-08-06 00:25:54 +02:00
if err == nil {
2021-03-01 23:34:19 +01:00
hoursDiff := math.Abs(sqlTime.Sub(ytdlUploadDate).Hours())
if hoursDiff > 48 {
logrus.Infof("upload day from APIs differs from the ytdl one by more than 2 days.")
2020-08-18 00:03:38 +02:00
} else {
return sqlTime.Format(releaseTimeFormat), nil
}
2020-08-06 02:12:05 +02:00
} else {
logrus.Error(err)
}
}
2020-08-18 00:03:38 +02:00
if time.Now().AddDate(0, 0, -3).After(ytdlUploadDate) {
return ytdlUploadDate.Format(releaseTimeFormat), nil
}
client := getClient(ip)
req, err := http.NewRequest(http.MethodGet, "https://caa.iti.gr/get_verificationV3?url=https://www.youtube.com/watch?v="+videoID, nil)
if err != nil {
2020-08-06 02:12:05 +02:00
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
req.Header.Set("User-Agent", ChromeUA)
res, err := client.Do(req)
if err != nil {
2020-08-06 02:12:05 +02:00
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
defer res.Body.Close()
var uploadTime struct {
Time string `json:"video_upload_time"`
Message string `json:"message"`
Status string `json:"status"`
}
err = json.NewDecoder(res.Body).Decode(&uploadTime)
if err != nil {
2020-08-06 02:12:05 +02:00
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
if uploadTime.Status == "ERROR1" {
2020-08-06 02:12:05 +02:00
return ytdlUploadDate.Format(releaseTimeFormat), errNotScraped
}
if uploadTime.Status == "" && strings.HasPrefix(uploadTime.Message, "CANNOT_RETRIEVE_REPORT_FOR_VIDEO_") {
2020-08-06 02:12:05 +02:00
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err("cannot retrieve report for video")
}
if uploadTime.Time == "" {
2020-08-06 02:12:05 +02:00
return ytdlUploadDate.Format(releaseTimeFormat), errUploadTimeEmpty
}
return uploadTime.Time, nil
2020-07-27 23:14:06 +02:00
}
func getClient(ip *net.TCPAddr) *http.Client {
if ip == nil {
return http.DefaultClient
}
return &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
LocalAddr: ip,
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
}
2020-08-12 19:44:57 +02:00
const (
GoogleBotUA = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
ChromeUA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36"
2020-08-12 19:44:57 +02:00
maxAttempts = 3
extractionError = "YouTube said: Unable to extract video data"
throttledError = "HTTP Error 429"
AlternateThrottledError = "returned non-zero exit status 8"
youtubeDlError = "exit status 1"
2021-06-18 03:09:19 +02:00
videoPremiereError = "Premieres in"
liveEventError = "This live event will begin in"
2020-08-12 19:44:57 +02:00
)
2021-06-18 03:09:19 +02:00
func run(use string, args []string, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) {
var useragent []string
2020-08-12 19:44:57 +02:00
var lastError error
for attempts := 0; attempts < maxAttempts; attempts++ {
sourceAddress, err := getIPFromPool(use, stopChan, pool)
if err != nil {
return nil, err
2020-07-30 18:03:07 +02:00
}
2020-07-30 19:05:12 +02:00
argsForCommand := append(args, "--source-address", sourceAddress)
argsForCommand = append(argsForCommand, useragent...)
binary := "yt-dlp"
2021-02-22 17:29:52 +01:00
cmd := exec.Command(binary, argsForCommand...)
2020-08-12 19:44:57 +02:00
res, err := runCmd(cmd, stopChan)
pool.ReleaseIP(sourceAddress)
if err == nil {
return res, nil
2020-07-27 23:14:06 +02:00
}
2020-08-12 19:44:57 +02:00
lastError = err
if strings.Contains(err.Error(), youtubeDlError) {
2021-06-18 03:09:19 +02:00
if util.SubstringInSlice(err.Error(), shared.ErrorsNoRetry) {
break
}
2020-08-12 19:44:57 +02:00
if strings.Contains(err.Error(), extractionError) {
logrus.Warnf("known extraction error: %s", errors.FullTrace(err))
useragent = nextUA(useragent)
2020-07-30 18:14:06 +02:00
}
2020-08-12 19:44:57 +02:00
if strings.Contains(err.Error(), throttledError) || strings.Contains(err.Error(), AlternateThrottledError) {
pool.SetThrottled(sourceAddress)
//we don't want throttle errors to count toward the max retries
attempts--
2020-07-30 18:14:06 +02:00
}
}
2020-08-12 19:44:57 +02:00
}
return nil, lastError
}
2020-08-12 19:44:57 +02:00
func nextUA(current []string) []string {
if len(current) == 0 {
return []string{"--user-agent", GoogleBotUA}
2020-08-12 19:44:57 +02:00
}
return []string{"--user-agent", ChromeUA}
2020-08-12 19:44:57 +02:00
}
func runCmd(cmd *exec.Cmd, stopChan stop.Chan) ([]string, error) {
logrus.Infof("running yt-dlp cmd: %s", strings.Join(cmd.Args, " "))
2020-08-12 19:44:57 +02:00
var err error
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, errors.Err(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, errors.Err(err)
}
err = cmd.Start()
if err != nil {
return nil, errors.Err(err)
}
outLog, err := ioutil.ReadAll(stdout)
if err != nil {
return nil, errors.Err(err)
}
errorLog, err := ioutil.ReadAll(stderr)
if err != nil {
return nil, errors.Err(err)
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-stopChan:
err := cmd.Process.Kill()
if err != nil {
return nil, errors.Prefix("failed to kill command after stopper cancellation", err)
}
2020-10-07 03:40:19 +02:00
return nil, errors.Err("interrupted by user")
2020-08-12 19:44:57 +02:00
case err := <-done:
if err != nil {
return nil, errors.Prefix("yt-dlp "+strings.Join(cmd.Args, " ")+" ["+string(errorLog)+"]", err)
2020-07-30 18:14:06 +02:00
}
2020-08-12 19:44:57 +02:00
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
}
}
func getIPFromPool(use string, stopChan stop.Chan, pool *ip_manager.IPPool) (sourceAddress string, err error) {
for {
sourceAddress, err = pool.GetIP(use)
if err != nil {
if errors.Is(err, ip_manager.ErrAllThrottled) {
select {
case <-stopChan:
return "", errors.Err("interrupted by user")
default:
time.Sleep(ip_manager.IPCooldownPeriod)
continue
}
} else {
return "", err
}
}
break
}
return
}