From 36f539ef5d9e7aaeebbaaffef630fef1527e66e8 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 10 Dec 2019 23:02:56 +0100 Subject: [PATCH] change ip throttling management --- ip_manager/throttle.go | 239 +++++++++++++++++++++++------------- ip_manager/throttle_test.go | 26 ++++ manager/ytsync.go | 9 +- sources/youtubeVideo.go | 40 +++--- 4 files changed, 204 insertions(+), 110 deletions(-) create mode 100644 ip_manager/throttle_test.go diff --git a/ip_manager/throttle.go b/ip_manager/throttle.go index 388f1b6..f10c229 100644 --- a/ip_manager/throttle.go +++ b/ip_manager/throttle.go @@ -1,122 +1,191 @@ package ip_manager import ( - "github.com/asaskevich/govalidator" - "github.com/lbryio/ytsync/util" - log "github.com/sirupsen/logrus" - - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/extras/stop" - "net" + "sort" "sync" "time" + + "github.com/asaskevich/govalidator" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/lbryio/ytsync/util" + log "github.com/sirupsen/logrus" ) const IPCooldownPeriod = 35 * time.Second const unbanTimeout = 3 * time.Hour -var ipv6Pool []string -var ipv4Pool []string -var throttledIPs map[string]bool -var ipInUse map[string]bool -var ipLastUsed map[string]time.Time -var ipMutex sync.Mutex var stopper = stop.New() -func GetNextIP(ipv6 bool) (string, error) { - ipMutex.Lock() - defer ipMutex.Unlock() - if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 { - throttledIPs = make(map[string]bool) - ipInUse = make(map[string]bool) - ipLastUsed = make(map[string]time.Time) - addrs, err := net.InterfaceAddrs() - if err != nil { - return "", errors.Err(err) - } +type IPPool struct { + ips []throttledIP + lock *sync.Mutex +} - for _, address := range addrs { - if ipnet, ok := address.(*net.IPNet); ok && ipnet.IP.IsGlobalUnicast() { - if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) { - ipv6Pool = append(ipv6Pool, ipnet.IP.String()) - ipLastUsed[ipnet.IP.String()] = time.Now().Add(-IPCooldownPeriod) - } else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) { - ipv4Pool = append(ipv4Pool, ipnet.IP.String()) - ipLastUsed[ipnet.IP.String()] = time.Now().Add(-IPCooldownPeriod) - } +type throttledIP struct { + IP string + LastUse time.Time + Throttled bool + InUse bool +} + +var ipPoolInstance *IPPool + +func GetIPPool() (*IPPool, error) { + if ipPoolInstance != nil { + return ipPoolInstance, nil + } + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, errors.Err(err) + } + var pool []throttledIP + ipv6Added := false + for _, address := range addrs { + if ipnet, ok := address.(*net.IPNet); ok && ipnet.IP.IsGlobalUnicast() { + if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) && !ipv6Added { + pool = append(pool, throttledIP{ + IP: ipnet.IP.String(), + LastUse: time.Time{}, + }) + ipv6Added = true + } else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) { + pool = append(pool, throttledIP{ + IP: ipnet.IP.String(), + LastUse: time.Time{}, + }) } } } - nextIP := "" - if ipv6 { - nextIP = getLeastUsedIP(ipv6Pool) - } else { - nextIP = getLeastUsedIP(ipv4Pool) + ipPoolInstance = &IPPool{ + ips: pool, + lock: &sync.Mutex{}, } - if nextIP == "" { - return "throttled", errors.Err("all IPs are throttled") - } - lastUse := ipLastUsed[nextIP] - if time.Since(lastUse) < IPCooldownPeriod { - log.Debugf("The IP %s is too hot, waiting for %.1f seconds before continuing", nextIP, (IPCooldownPeriod - time.Since(lastUse)).Seconds()) - time.Sleep(IPCooldownPeriod - time.Since(lastUse)) - } - - ipInUse[nextIP] = true - return nextIP, nil + return ipPoolInstance, nil } -func ReleaseIP(ip string) { - ipMutex.Lock() - defer ipMutex.Unlock() - ipLastUsed[ip] = time.Now() - ipInUse[ip] = false -} - -func getLeastUsedIP(ipPool []string) string { - nextIP := "" - veryLastUse := time.Now() - for _, ip := range ipPool { - isThrottled := throttledIPs[ip] - if isThrottled { - continue - } - inUse := ipInUse[ip] - if inUse { - continue - } - lastUse := ipLastUsed[ip] - if lastUse.Before(veryLastUse) { - nextIP = ip - veryLastUse = lastUse +// AllThrottled checks whether the IPs provided are all throttled. +// returns false if at least one IP is not throttled +// Not thread safe, should use locking when called +func AllThrottled(ips []throttledIP) bool { + for _, i := range ips { + if !i.Throttled { + return false } } - return nextIP + return true } -func SetIpThrottled(ip string, stopGrp *stop.Group) { - ipMutex.Lock() - isThrottled := throttledIPs[ip] - if isThrottled { - return +// AllInUse checks whether the IPs provided are all currently in use. +// returns false if at least one IP is not in use AND is not throttled +// Not thread safe, should use locking when called +func AllInUse(ips []throttledIP) bool { + for _, i := range ips { + if !i.InUse && !i.Throttled { + return false + } + } + return true +} + +func (i *IPPool) ReleaseIP(ip string) { + i.lock.Lock() + defer i.lock.Unlock() + for j, _ := range i.ips { + localIP := &i.ips[j] + if localIP.IP == ip { + localIP.InUse = false + localIP.LastUse = time.Now() + break + } + } +} + +func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) { + i.lock.Lock() + defer i.lock.Unlock() + var tIP *throttledIP + for j, _ := range i.ips { + localIP := &i.ips[j] + if localIP.IP == ip { + if localIP.Throttled { + return + } + localIP.Throttled = true + tIP = localIP + break + } } - throttledIPs[ip] = true - ipMutex.Unlock() util.SendErrorToSlack("%s set to throttled", ip) stopper.Add(1) - go func() { + go func(tIP *throttledIP) { defer stopper.Done() unbanTimer := time.NewTimer(unbanTimeout) select { case <-unbanTimer.C: - ipMutex.Lock() - throttledIPs[ip] = false - ipMutex.Unlock() + i.lock.Lock() + tIP.Throttled = false + i.lock.Unlock() util.SendInfoToSlack("%s set back to not throttled", ip) case <-stopGrp.Ch(): unbanTimer.Stop() } - }() + }(tIP) +} + +var ErrAllInUse = errors.Base("all IPs are in use, try again") +var ErrAllThrottled = errors.Base("all IPs are throttled") +var ErrResourceLock = errors.Base("error getting next ip, did you forget to lock on the resource?") + +func (i *IPPool) nextIP() (*throttledIP, error) { + i.lock.Lock() + defer i.lock.Unlock() + + sort.Slice(i.ips, func(j, k int) bool { + return i.ips[j].LastUse.Before(i.ips[j].LastUse) + }) + + if !AllThrottled(i.ips) { + if AllInUse(i.ips) { + return nil, errors.Err(ErrAllInUse) + } + + var nextIP *throttledIP + for j, _ := range i.ips { + ip := &i.ips[j] + if ip.InUse || ip.Throttled { + continue + } + nextIP = ip + break + } + if nextIP == nil { + return nil, errors.Err(ErrResourceLock) + } + nextIP.InUse = true + return nextIP, nil + } + return nil, errors.Err(ErrAllThrottled) +} + +func (i *IPPool) GetIP() (string, error) { + for { + ip, err := i.nextIP() + if err != nil { + if errors.Is(err, ErrAllInUse) { + time.Sleep(5 * time.Second) + continue + } else if errors.Is(err, ErrAllThrottled) { + return "throttled", err + } + return "", err + } + if time.Since(ip.LastUse) < IPCooldownPeriod { + log.Debugf("The IP %s is too hot, waiting for %.1f seconds before continuing", ip.IP, (IPCooldownPeriod - time.Since(ip.LastUse)).Seconds()) + time.Sleep(IPCooldownPeriod - time.Since(ip.LastUse)) + } + return ip.IP, nil + } } diff --git a/ip_manager/throttle_test.go b/ip_manager/throttle_test.go new file mode 100644 index 0000000..bc2b3ba --- /dev/null +++ b/ip_manager/throttle_test.go @@ -0,0 +1,26 @@ +package ip_manager + +import ( + "testing" +) + +func TestAll(t *testing.T) { + pool, err := GetIPPool() + if err != nil { + t.Fatal(err) + } + for range pool.ips { + ip, err := pool.GetIP() + if err != nil { + t.Fatal(err) + } + t.Log(ip) + } + + next, err := pool.nextIP() + if err != nil { + t.Logf("%s", err.Error()) + } else { + t.Fatal(next) + } +} diff --git a/manager/ytsync.go b/manager/ytsync.go index c14cd68..281a1b3 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/lbryio/ytsync/ip_manager" "github.com/lbryio/ytsync/namer" "github.com/lbryio/ytsync/sdk" "github.com/lbryio/ytsync/sources" @@ -963,6 +964,10 @@ func (s *Sync) enqueueYoutubeVideos() error { } var videos []video + ipPool, err := ip_manager.GetIPPool() + if err != nil { + return err + } playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50) nextPageToken := "" for { @@ -1000,7 +1005,7 @@ func (s *Sync) enqueueYoutubeVideos() error { return errors.Prefix("error getting videos info", err) } for _, item := range videosListResponse.Items { - videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig(), s.grp)) + videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig(), s.grp, ipPool)) } log.Infof("Got info for %d videos from youtube API", len(videos)) @@ -1016,7 +1021,7 @@ func (s *Sync) enqueueYoutubeVideos() error { } _, ok := playlistMap[k] if !ok { - videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig(), s.grp)) + videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig(), s.grp, ipPool)) } } diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 1a6d061..7fcdc7d 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -50,6 +50,7 @@ type YoutubeVideo struct { mocked bool walletLock *sync.RWMutex stopGroup *stop.Group + pool *ip_manager.IPPool } var youtubeCategories = map[string]string{ @@ -87,7 +88,7 @@ var youtubeCategories = map[string]string{ "44": "trailers", } -func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo { +func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo { publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors return &YoutubeVideo{ id: videoData.Id, @@ -101,9 +102,10 @@ func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPositio mocked: false, youtubeChannelID: videoData.Snippet.ChannelId, stopGroup: stopGroup, + pool: pool, } } -func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo { +func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo { return &YoutubeVideo{ id: videoID, playlistPosition: 0, @@ -112,6 +114,7 @@ func NewMockedVideo(directory string, videoID string, youtubeChannelID string, a mocked: true, youtubeChannelID: youtubeChannelID, stopGroup: stopGroup, + pool: pool, } } @@ -220,7 +223,8 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { fmt.Sprintf("duration <= %d", int(math.Round(v.maxVideoLength*3600))), ) } - sourceAddress, err := ip_manager.GetNextIP(useIPv6) + + sourceAddress, err := v.pool.GetIP() if err != nil { if sourceAddress == "throttled" { for { @@ -231,8 +235,8 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { } time.Sleep(ip_manager.IPCooldownPeriod) - sourceAddress, err = ip_manager.GetNextIP(useIPv6) - if err == nil { + sourceAddress, err = v.pool.GetIP() + if err == nil { //TODO: This is possibly not 100% right, but it works so I'm not touching it... break } } @@ -240,23 +244,13 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { return errors.Err(err) } } - defer ip_manager.ReleaseIP(sourceAddress) - if useIPv6 { - log.Infof("using IPv6: %s", sourceAddress) - ytdlArgs = append(ytdlArgs, - "-6", - "--source-address", - sourceAddress, - ) - } else { - log.Infof("using IPv4: %s", sourceAddress) - ytdlArgs = append(ytdlArgs, - "-4", - "--source-address", - sourceAddress, - ) - } - ytdlArgs = append(ytdlArgs, "https://www.youtube.com/watch?v="+v.ID()) + defer v.pool.ReleaseIP(sourceAddress) + ytdlArgs = append(ytdlArgs, + "--source-address", + sourceAddress, + "https://www.youtube.com/watch?v="+v.ID(), + ) + runcmd: argsWithFilters := append(ytdlArgs, "-fbestvideo[ext=mp4][height<="+qualities[qualityIndex]+"]+bestaudio[ext!=webm]") cmd := exec.Command("youtube-dl", argsWithFilters...) @@ -282,7 +276,7 @@ runcmd: if err = cmd.Wait(); err != nil { if strings.Contains(err.Error(), "exit status 1") { if strings.Contains(string(errorLog), "HTTP Error 429") { - ip_manager.SetIpThrottled(sourceAddress, v.stopGroup) + v.pool.SetThrottled(sourceAddress, v.stopGroup) } else if strings.Contains(string(errorLog), "giving up after 0 fragment retries") && qualityIndex < len(qualities)-1 { qualityIndex++ goto runcmd