i made a huge mess. sorry future me, when you're digging through this

This commit is contained in:
Alex Grintsvayg 2020-07-28 21:34:08 -04:00
parent a3dd3dc626
commit 0eef62b5fd
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
7 changed files with 179 additions and 69 deletions

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/url" "net/url"
"os/exec" "os/exec"
@ -15,14 +16,15 @@ import (
"github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func GetPlaylistVideoIDs(channelName string, maxVideos int) ([]string, error) { func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan) ([]string, error) {
args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist"} args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist"}
ids, err := run(args, false, true) ids, err := run(args, false, true, stopChan)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
@ -36,48 +38,51 @@ func GetPlaylistVideoIDs(channelName string, maxVideos int) ([]string, error) {
return videoIDs, nil return videoIDs, nil
} }
func GetVideoInformation(videoID string) (*ytdl.YtdlVideo, error) { func GetVideoInformation(videoID string, stopChan stop.Chan, ip *net.TCPAddr) (*ytdl.YtdlVideo, error) {
//args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID} args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID}
//results, err := run(args, false, true) results, err := run(args, false, true, stopChan)
//if err != nil { if err != nil {
// return nil, errors.Err(err) return nil, errors.Err(err)
//} }
var video *ytdl.YtdlVideo var video *ytdl.YtdlVideo
//err = json.Unmarshal([]byte(results[0]), &video) err = json.Unmarshal([]byte(results[0]), &video)
//if err != nil { if err != nil {
// return nil, errors.Err(err) return nil, errors.Err(err)
//} }
video = &ytdl.YtdlVideo{}
// now get an accurate time // now get an accurate time
const maxTries = 5 const maxTries = 5
tries := 0 tries := 0
GetTime: GetTime:
tries++ tries++
t, err := getUploadTime(videoID) t, err := getUploadTime(videoID, ip)
if err != nil { if err != nil {
slack(":warning: Upload time error: %v", err) //slack(":warning: Upload time error: %v", err)
if tries <= maxTries && (errors.Is(err, errNotScraped) || errors.Is(err, errUploadTimeEmpty)) { if tries <= maxTries && (errors.Is(err, errNotScraped) || errors.Is(err, errUploadTimeEmpty)) {
triggerScrape(videoID) err := triggerScrape(videoID, ip)
time.Sleep(2 * time.Second) // let them scrape it if err == nil {
goto GetTime time.Sleep(2 * time.Second) // let them scrape it
goto GetTime
} else {
//slack("triggering scrape returned error: %v", err)
}
} else if !errors.Is(err, errNotScraped) && !errors.Is(err, errUploadTimeEmpty) { } else if !errors.Is(err, errNotScraped) && !errors.Is(err, errUploadTimeEmpty) {
slack(":warning: Error while trying to get accurate upload time for %s: %v", videoID, err) //slack(":warning: Error while trying to get accurate upload time for %s: %v", videoID, err)
return nil, errors.Err(err) return nil, errors.Err(err)
} }
// do fallback below // do fallback below
} }
slack("After all that, upload time for %s is %s", videoID, t) //slack("After all that, upload time for %s is %s", videoID, t)
if t != "" { if t != "" {
parsed, err := time.Parse("2006-01-02, 15:04:05 (MST)", t) // this will probably be UTC, but Go's timezone parsing is fucked up. it ignores the timezone in the date parsed, err := time.Parse("2006-01-02, 15:04:05 (MST)", t) // this will probably be UTC, but Go's timezone parsing is fucked up. it ignores the timezone in the date
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
slack(":exclamation: Got an accurate time for %s", videoID)
video.UploadDateForReal = parsed video.UploadDateForReal = parsed
} else { } else {
slack(":warning: Could not get accurate time for %s. Falling back to estimated time.", videoID) //slack(":warning: Could not get accurate time for %s. Falling back to time from upload ytdl: %s.", videoID, video.UploadDate)
// fall back to UploadDate from youtube-dl // fall back to UploadDate from youtube-dl
video.UploadDateForReal, err = time.Parse("20060102", video.UploadDate) video.UploadDateForReal, err = time.Parse("20060102", video.UploadDate)
if err != nil { if err != nil {
@ -96,30 +101,62 @@ func slack(format string, a ...interface{}) {
util.SendToSlack(format, a...) util.SendToSlack(format, a...)
} }
func triggerScrape(videoID string) error { func triggerScrape(videoID string, ip *net.TCPAddr) error {
slack("Triggering scrape for %s", videoID) //slack("Triggering scrape for %s", videoID)
u, err := url.Parse("https://caa.iti.gr/verify_videoV3") u, err := url.Parse("https://caa.iti.gr/verify_videoV3")
q := u.Query() q := u.Query()
q.Set("twtimeline", "0") q.Set("twtimeline", "0")
q.Set("url", "https://www.youtube.com/watch?v="+videoID) q.Set("url", "https://www.youtube.com/watch?v="+videoID)
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
slack("GET %s", u.String()) //slack("GET %s", u.String())
res, err := http.Get(u.String())
client := getClient(ip)
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return errors.Err(err)
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36")
res, err := client.Do(req)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
defer res.Body.Close() defer res.Body.Close()
all, err := ioutil.ReadAll(res.Body) var response struct {
spew.Dump(string(all), err) Message string `json:"message"`
Status string `json:"status"`
VideoURL string `json:"video_url"`
}
err = json.NewDecoder(res.Body).Decode(&response)
if err != nil {
return errors.Err(err)
}
switch response.Status {
case "removed_video":
return errors.Err("video previously removed from service")
case "no_video":
return errors.Err("they say 'video cannot be found'. wtf?")
default:
spew.Dump(response)
}
return nil return nil
//https://caa.iti.gr/caa/api/v4/videos/reports/h-tuxHS5lSM //https://caa.iti.gr/caa/api/v4/videos/reports/h-tuxHS5lSM
} }
func getUploadTime(videoID string) (string, error) { func getUploadTime(videoID string, ip *net.TCPAddr) (string, error) {
slack("Getting upload time for %s", videoID) //slack("Getting upload time for %s", videoID)
res, err := http.Get("https://caa.iti.gr/get_verificationV3?url=https://www.youtube.com/watch?v=" + videoID)
client := getClient(ip)
req, err := http.NewRequest(http.MethodGet, "https://caa.iti.gr/get_verificationV3?url=https://www.youtube.com/watch?v="+videoID, nil)
if err != nil {
return "", errors.Err(err)
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36")
res, err := client.Do(req)
if err != nil { if err != nil {
return "", errors.Err(err) return "", errors.Err(err)
} }
@ -139,6 +176,10 @@ func getUploadTime(videoID string) (string, error) {
return "", errNotScraped return "", errNotScraped
} }
if uploadTime.Status == "" && strings.HasPrefix(uploadTime.Message, "CANNOT_RETRIEVE_REPORT_FOR_VIDEO_") {
return "", errors.Err("cannot retrieve report for video")
}
if uploadTime.Time == "" { if uploadTime.Time == "" {
return "", errUploadTimeEmpty return "", errUploadTimeEmpty
} }
@ -146,7 +187,28 @@ func getUploadTime(videoID string) (string, error) {
return uploadTime.Time, nil return uploadTime.Time, nil
} }
func run(args []string, withStdErr, withStdOut bool) ([]string, error) { func getClient(ip *net.TCPAddr) *http.Client {
if ip == nil {
return http.DefaultClient
}
return &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
LocalAddr: ip,
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
}
func run(args []string, withStdErr, withStdOut bool, stopChan stop.Chan) ([]string, error) {
cmd := exec.Command("youtube-dl", args...) cmd := exec.Command("youtube-dl", args...)
logrus.Printf("Running command youtube-dl %s", strings.Join(args, " ")) logrus.Printf("Running command youtube-dl %s", strings.Join(args, " "))
@ -181,10 +243,23 @@ func run(args []string, withStdErr, withStdOut bool) ([]string, error) {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
} }
err := cmd.Wait()
if len(errorLog) > 0 { done := make(chan error, 1)
return nil, errors.Err(err) go func() {
done <- cmd.Wait()
}()
select {
case <-stopChan:
if err := cmd.Process.Kill(); err != nil {
return nil, errors.Prefix("failed to kill command after stopper cancellation", err)
}
return nil, errors.Err("canceled by stopper")
case err := <-done:
if err != nil {
return nil, errors.Prefix("youtube-dl "+strings.Join(args, " "), err)
}
} }
if len(errorLog) > 0 { if len(errorLog) > 0 {
return nil, errors.Err(string(errorLog)) return nil, errors.Err(string(errorLog))
} }

View file

@ -17,7 +17,7 @@ func TestGetPlaylistVideoIDs(t *testing.T) {
} }
func TestGetVideoInformation(t *testing.T) { func TestGetVideoInformation(t *testing.T) {
video, err := GetVideoInformation("zj7pXM9gE5M") video, err := GetVideoInformation("zj7pXM9gE5M", nil)
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
} }

View file

@ -63,21 +63,21 @@ func GetIPPool(stopGrp *stop.Group) (*IPPool, error) {
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
stopGrp: stopGrp, stopGrp: stopGrp,
} }
ticker := time.NewTicker(10 * time.Second) //ticker := time.NewTicker(10 * time.Second)
go func() { //go func() {
for { // for {
select { // select {
case <-stopGrp.Ch(): // case <-stopGrp.Ch():
return // return
case <-ticker.C: // case <-ticker.C:
ipPoolInstance.lock.RLock() // ipPoolInstance.lock.RLock()
for _, ip := range ipPoolInstance.ips { // for _, ip := range ipPoolInstance.ips {
log.Debugf("IP: %s\tInUse: %t\tVideoID: %s\tThrottled: %t\tLastUse: %.1f", ip.IP, ip.InUse, ip.UsedForVideo, ip.Throttled, time.Since(ip.LastUse).Seconds()) // log.Debugf("IP: %s\tInUse: %t\tVideoID: %s\tThrottled: %t\tLastUse: %.1f", ip.IP, ip.InUse, ip.UsedForVideo, ip.Throttled, time.Since(ip.LastUse).Seconds())
} // }
ipPoolInstance.lock.RUnlock() // ipPoolInstance.lock.RUnlock()
} // }
} // }
}() //}()
return ipPoolInstance, nil return ipPoolInstance, nil
} }
@ -108,7 +108,7 @@ func AllInUse(ips []throttledIP) bool {
func (i *IPPool) ReleaseIP(ip string) { func (i *IPPool) ReleaseIP(ip string) {
i.lock.Lock() i.lock.Lock()
defer i.lock.Unlock() defer i.lock.Unlock()
for j, _ := range i.ips { for j := range i.ips {
localIP := &i.ips[j] localIP := &i.ips[j]
if localIP.IP == ip { if localIP.IP == ip {
localIP.InUse = false localIP.InUse = false
@ -122,7 +122,7 @@ func (i *IPPool) ReleaseIP(ip string) {
func (i *IPPool) ReleaseAll() { func (i *IPPool) ReleaseAll() {
i.lock.Lock() i.lock.Lock()
defer i.lock.Unlock() defer i.lock.Unlock()
for j, _ := range i.ips { for j := range i.ips {
if i.ips[j].Throttled { if i.ips[j].Throttled {
continue continue
} }
@ -183,7 +183,7 @@ func (i *IPPool) nextIP(forVideo string) (*throttledIP, error) {
} }
var nextIP *throttledIP var nextIP *throttledIP
for j, _ := range i.ips { for j := range i.ips {
ip := &i.ips[j] ip := &i.ips[j]
if ip.InUse || ip.Throttled { if ip.InUse || ip.Throttled {
continue continue

21
main.go
View file

@ -7,10 +7,8 @@ import (
"os" "os"
"time" "time"
"github.com/davecgh/go-spew/spew"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/downloader"
"github.com/lbryio/ytsync/v5/manager" "github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/sdk"
ytUtils "github.com/lbryio/ytsync/v5/util" ytUtils "github.com/lbryio/ytsync/v5/util"
@ -40,8 +38,23 @@ var (
) )
func main() { func main() {
spew.Dump(downloader.GetVideoInformation("oahaMa3XB0k")) //grp := stop.New()
return //ipPool, err := ip_manager.GetIPPool(grp)
//if err != nil {
// panic(err)
//}
//
//videoID := "vtIzMaLkCaM"
//
//ip, err := ipPool.GetIP(videoID)
//if err != nil {
// panic(err)
//}
//
//spew.Dump(ip)
//
//spew.Dump(downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)}))
//return
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)

View file

@ -234,7 +234,7 @@ func (s *SyncManager) Start() error {
} }
shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server") shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server")
if !shouldNotCount { if !shouldNotCount {
logUtils.SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) logUtils.SendInfoToSlack("A non fatal error was reported by the sync process.\n%s", errors.FullTrace(err))
} }
} }
err = blobs_reflector.ReflectAndClean() err = blobs_reflector.ReflectAndClean()

View file

@ -253,6 +253,8 @@ func (s *Sync) FullCycle() (e error) {
util.SendToSlack("got interrupt, shutting down") util.SendToSlack("got interrupt, shutting down")
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop() s.grp.Stop()
time.Sleep(5 * time.Second)
debug.PrintStack() // so we can figure out what's not stopping
}() }()
err := s.setStatusSyncing() err := s.setStatusSyncing()
if err != nil { if err != nil {
@ -854,6 +856,12 @@ func (s *Sync) startWorker(workerNum int) {
tryCount := 0 tryCount := 0
for { for {
select { // check again inside the loop so this dies faster
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
default:
}
tryCount++ tryCount++
err := s.processVideo(v) err := s.processVideo(v)
@ -988,10 +996,10 @@ func (s *Sync) enqueueYoutubeVideos() error {
return err return err
} }
videos, err := ytapi.GetVideosToSync(s.APIConfig.YoutubeAPIKey, s.YoutubeChannelID, s.syncedVideos, s.Manager.SyncFlags.QuickSync, s.Manager.videosLimit, ytapi.VideoParams{ videos, err := ytapi.GetVideosToSync(s.YoutubeChannelID, s.syncedVideos, s.Manager.SyncFlags.QuickSync, s.Manager.videosLimit, ytapi.VideoParams{
VideoDir: s.videoDirectory, VideoDir: s.videoDirectory,
S3Config: s.Manager.GetS3AWSConfig(), S3Config: s.Manager.GetS3AWSConfig(),
Grp: s.grp, Stopper: s.grp,
IPPool: ipPool, IPPool: ipPool,
}) })
if err != nil { if err != nil {

View file

@ -45,19 +45,19 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
type VideoParams struct { type VideoParams struct {
VideoDir string VideoDir string
S3Config aws.Config S3Config aws.Config
Grp *stop.Group Stopper *stop.Group
IPPool *ip_manager.IPPool IPPool *ip_manager.IPPool
} }
var mostRecentlyFailedChannel string // TODO: fix this hack! var mostRecentlyFailedChannel string // TODO: fix this hack!
func GetVideosToSync(apiKey, channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams) ([]Video, error) { func GetVideosToSync(channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams) ([]Video, error) {
var videos []Video var videos []Video
if quickSync { if quickSync {
maxVideos = 50 maxVideos = 50
} }
videoIDs, err := downloader.GetPlaylistVideoIDs(channelID, maxVideos) videoIDs, err := downloader.GetPlaylistVideoIDs(channelID, maxVideos, videoParams.Stopper.Ch())
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
@ -76,14 +76,14 @@ func GetVideosToSync(apiKey, channelID string, syncedVideos map[string]sdk.Synce
mostRecentlyFailedChannel = channelID mostRecentlyFailedChannel = channelID
} }
vids, err := getVideos(videoIDs) vids, err := getVideos(videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, item := range vids { for _, item := range vids {
positionInList := playlistMap[item.ID] positionInList := playlistMap[item.ID]
videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.S3Config, videoParams.Grp, videoParams.IPPool) videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
@ -95,7 +95,7 @@ func GetVideosToSync(apiKey, channelID string, syncedVideos map[string]sdk.Synce
continue continue
} }
if _, ok := playlistMap[k]; !ok { if _, ok := playlistMap[k]; !ok {
videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Grp, videoParams.IPPool)) videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool))
} }
} }
@ -143,6 +143,7 @@ func CountVideosInChannel(channelID string) (int, error) {
func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.ChannelBrandingSettings, error) { func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.ChannelBrandingSettings, error) {
return nil, nil, errors.Err("ChannelInfo doesn't work yet because we're focused on existing channels") return nil, nil, errors.Err("ChannelInfo doesn't work yet because we're focused on existing channels")
service, err := ytlib.New(&http.Client{Transport: &transport.APIKey{Key: apiKey}}) service, err := ytlib.New(&http.Client{Transport: &transport.APIKey{Key: apiKey}})
if err != nil { if err != nil {
return nil, nil, errors.Prefix("error creating YouTube service", err) return nil, nil, errors.Prefix("error creating YouTube service", err)
@ -160,14 +161,27 @@ func ChannelInfo(apiKey, channelID string) (*ytlib.ChannelSnippet, *ytlib.Channe
return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil return response.Items[0].Snippet, response.Items[0].BrandingSettings, nil
} }
func getVideos(videoIDs []string) ([]*ytdl.YtdlVideo, error) { func getVideos(videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) {
var videos []*ytdl.YtdlVideo var videos []*ytdl.YtdlVideo
for _, videoID := range videoIDs { for _, videoID := range videoIDs {
video, err := downloader.GetVideoInformation(videoID) select {
case <-stopChan:
return videos, errors.Err("canceled by stopper")
default:
}
//ip, err := ipPool.GetIP(videoID)
//if err != nil {
// return nil, err
//}
//video, err := downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)})
video, err := downloader.GetVideoInformation(videoID, stopChan, nil)
if err != nil { if err != nil {
//ipPool.ReleaseIP(ip)
return nil, errors.Err(err) return nil, errors.Err(err)
} }
videos = append(videos, video) videos = append(videos, video)
//ipPool.ReleaseIP(ip)
} }
return videos, nil return videos, nil
} }