From a3fcd67611c61e1251c9f64df3f34c1ade6696c0 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 12 Jul 2019 21:32:49 +0200 Subject: [PATCH] better handle interruptions by user refactor IP throttling in its own package --- ipManager/throttle.go | 111 ++++++++++++++++++++++++++++++++++++ manager/ytsync.go | 7 ++- sources/youtubeVideo.go | 122 +++++++++------------------------------- 3 files changed, 142 insertions(+), 98 deletions(-) create mode 100644 ipManager/throttle.go diff --git a/ipManager/throttle.go b/ipManager/throttle.go new file mode 100644 index 0000000..a123ab4 --- /dev/null +++ b/ipManager/throttle.go @@ -0,0 +1,111 @@ +package ipManager + +import ( + "github.com/asaskevich/govalidator" + log "github.com/sirupsen/logrus" + + "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/extras/stop" + + "net" + "sync" + "time" +) + +const IPCooldownPeriod = 20 * time.Second +const unbanTimeout = 3 * time.Hour + +var ipv6Pool []string +var ipv4Pool []string +var throttledIPs map[string]bool +var ipLastUsed map[string]time.Time +var ipMutex sync.Mutex +var stopper = stop.New() + +func SignalShutdown() { + stopper.Stop() +} + +func GetNextIP(ipv6 bool) (string, error) { + ipMutex.Lock() + defer ipMutex.Unlock() + if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "", errors.Err(err) + } + + for _, address := range addrs { + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) { + ipv6Pool = append(ipv6Pool, ipnet.IP.String()) + } else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) { + ipv4Pool = append(ipv4Pool, ipnet.IP.String()) + } + } + } + } + nextIP := "" + if ipv6 { + nextIP = getLeastUsedIP(ipv6Pool) + } else { + nextIP = getLeastUsedIP(ipv4Pool) + } + if nextIP == "" { + return "throttled", errors.Err("all IPs are throttled") + } + lastUse := ipLastUsed[nextIP] + if time.Since(lastUse) < IPCooldownPeriod { + time.Sleep(IPCooldownPeriod - time.Since(lastUse)) + } + + ipLastUsed[nextIP] = time.Now() + return nextIP, nil +} + +func getLeastUsedIP(ipPool []string) string { + nextIP := "" + veryLastUse := time.Now() + for _, ip := range ipPool { + isThrottled, _ := throttledIPs[ip] + if isThrottled { + continue + } + lastUse := ipLastUsed[ip] + if lastUse.Before(veryLastUse) { + nextIP = ip + veryLastUse = lastUse + } + } + return nextIP +} + +func SetIpThrottled(ip string, stopGrp *stop.Group) { + ipMutex.Lock() + defer ipMutex.Unlock() + isThrottled, _ := throttledIPs[ip] + if isThrottled { + return + } + throttledIPs[ip] = true + log.Printf("%s set to throttled", ip) + + stopper.Add(1) + go func() { + defer stopper.Done() + unbanTimer := time.NewTimer(unbanTimeout) + for { + select { + case <-unbanTimer.C: + throttledIPs[ip] = false + log.Printf("%s set back to not throttled", ip) + return + case <-stopGrp.Ch(): + unbanTimer.Stop() + return + default: + time.Sleep(5 * time.Second) + } + } + }() +} diff --git a/manager/ytsync.go b/manager/ytsync.go index 1b852cb..8a7c927 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -649,6 +649,9 @@ func (s *Sync) startWorker(workerNum int) { if err != nil { logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error()) log.Errorln(logMsg) + if strings.Contains(err.Error(), "interrupted by user") { + return + } fatalErrors := []string{ ":5279: read: connection reset by peer", "no space left on device", @@ -811,7 +814,7 @@ func (s *Sync) enqueueYoutubeVideos() error { return errors.Prefix("error getting videos info", err) } for _, item := range videosListResponse.Items { - videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig())) + videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig(), s.grp)) } log.Infof("Got info for %d videos from youtube API", len(videos)) @@ -827,7 +830,7 @@ func (s *Sync) enqueueYoutubeVideos() error { } _, ok := playlistMap[k] if !ok { - videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig())) + videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig(), s.grp)) } } diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 22dfbb1..526b115 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -4,7 +4,6 @@ import ( "fmt" "io/ioutil" "math" - "net" "os" "os/exec" "path/filepath" @@ -19,13 +18,13 @@ import ( "github.com/lbryio/lbry.go/extras/stop" "github.com/lbryio/lbry.go/extras/util" + "github.com/lbryio/ytsync/ipManager" "github.com/lbryio/ytsync/namer" "github.com/lbryio/ytsync/sdk" "github.com/lbryio/ytsync/tagsManager" "github.com/lbryio/ytsync/thumbs" "github.com/ChannelMeter/iso8601duration" - "github.com/asaskevich/govalidator" "github.com/aws/aws-sdk-go/aws" "github.com/shopspring/decimal" log "github.com/sirupsen/logrus" @@ -50,11 +49,9 @@ type YoutubeVideo struct { lbryChannelID string mocked bool walletLock *sync.RWMutex + stopGroup *stop.Group } -const IPCooldownPeriod = 20 * time.Second -const unbanTimeout = 3 * time.Hour - var youtubeCategories = map[string]string{ "1": "film & animation", "2": "autos & vehicles", @@ -90,7 +87,7 @@ var youtubeCategories = map[string]string{ "44": "trailers", } -func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config) *YoutubeVideo { +func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo { publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors return &YoutubeVideo{ id: videoData.Id, @@ -103,9 +100,10 @@ func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPositio awsConfig: awsConfig, mocked: false, youtubeChannelID: videoData.Snippet.ChannelId, + stopGroup: stopGroup, } } -func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config) *YoutubeVideo { +func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo { return &YoutubeVideo{ id: videoID, playlistPosition: 0, @@ -113,6 +111,7 @@ func NewMockedVideo(directory string, videoID string, youtubeChannelID string, a awsConfig: awsConfig, mocked: true, youtubeChannelID: youtubeChannelID, + stopGroup: stopGroup, } } @@ -176,93 +175,6 @@ func (v *YoutubeVideo) getAbbrevDescription() string { return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription } -var ipv6Pool []string -var ipv4Pool []string -var throttledIPs map[string]bool -var ipLastUsed map[string]time.Time -var ipMutex sync.Mutex - -func getNextIP(ipv6 bool) (string, error) { - ipMutex.Lock() - defer ipMutex.Unlock() - if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "", errors.Err(err) - } - - for _, address := range addrs { - if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) { - ipv6Pool = append(ipv6Pool, ipnet.IP.String()) - } else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) { - ipv4Pool = append(ipv4Pool, ipnet.IP.String()) - } - } - } - } - nextIP := "" - if ipv6 { - nextIP = getLeastUsedIP(ipv6Pool) - } else { - nextIP = getLeastUsedIP(ipv4Pool) - } - if nextIP == "" { - return "", errors.Err("all IPs are throttled") - } - lastUse := ipLastUsed[nextIP] - if time.Since(lastUse) < IPCooldownPeriod { - time.Sleep(IPCooldownPeriod - time.Since(lastUse)) - } - - ipLastUsed[nextIP] = time.Now() - return nextIP, nil -} -func getLeastUsedIP(ipPool []string) string { - nextIP := "" - veryLastUse := time.Now() - for _, ip := range ipPool { - isThrottled, _ := throttledIPs[ip] - if isThrottled { - continue - } - lastUse := ipLastUsed[ip] - if lastUse.Before(veryLastUse) { - nextIP = ip - veryLastUse = lastUse - } - } - return nextIP -} - -func setIpThrottled(ip string, stopGrp *stop.Group) { - ipMutex.Lock() - defer ipMutex.Unlock() - isThrottled, _ := throttledIPs[ip] - if isThrottled { - return - } - throttledIPs[ip] = true - log.Printf("%s set to throttled", ip) - - go func() { - unbanTimer := time.NewTimer(unbanTimeout) - for { - select { - case <-unbanTimer.C: - throttledIPs[ip] = false - log.Printf("%s set back to not throttled", ip) - return - case <-stopGrp.Ch(): - unbanTimer.Stop() - return - default: - time.Sleep(5 * time.Second) - } - } - }() -} - func (v *YoutubeVideo) download(useIPv6 bool) error { videoPath := v.getFullPath() @@ -290,9 +202,24 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { "--merge-output-format", "mp4", } - sourceAddress, err := getNextIP(useIPv6) + sourceAddress, err := ipManager.GetNextIP(useIPv6) if err != nil { - return errors.Err(err) + if sourceAddress == "throttled" { + for { + select { + case <-v.stopGroup.Ch(): + return errors.Err("interrupted by user") + default: + time.Sleep(20 * time.Second) + sourceAddress, err = ipManager.GetNextIP(useIPv6) + if err == nil { + break + } + } + } + } else { + return errors.Err(err) + } } if useIPv6 { log.Infof("using IPv6: %s", sourceAddress) @@ -332,6 +259,9 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { if err = cmd.Wait(); err != nil { if strings.Contains(err.Error(), "exit status 1") { + if strings.Contains(string(errorLog), "HTTP Error 429") { + ipManager.SetIpThrottled(sourceAddress, v.stopGroup) + } return errors.Err(string(errorLog)) } return errors.Err(err)