change ip throttling management

This commit is contained in:
Niko Storni 2019-12-10 23:02:56 +01:00
parent caca13de61
commit 36f539ef5d
4 changed files with 204 additions and 110 deletions

View file

@ -1,122 +1,191 @@
package ip_manager package ip_manager
import ( import (
"github.com/asaskevich/govalidator"
"github.com/lbryio/ytsync/util"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"net" "net"
"sort"
"sync" "sync"
"time" "time"
"github.com/asaskevich/govalidator"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/ytsync/util"
log "github.com/sirupsen/logrus"
) )
const IPCooldownPeriod = 35 * time.Second const IPCooldownPeriod = 35 * time.Second
const unbanTimeout = 3 * time.Hour const unbanTimeout = 3 * time.Hour
var ipv6Pool []string
var ipv4Pool []string
var throttledIPs map[string]bool
var ipInUse map[string]bool
var ipLastUsed map[string]time.Time
var ipMutex sync.Mutex
var stopper = stop.New() var stopper = stop.New()
func GetNextIP(ipv6 bool) (string, error) { type IPPool struct {
ipMutex.Lock() ips []throttledIP
defer ipMutex.Unlock() lock *sync.Mutex
if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 { }
throttledIPs = make(map[string]bool)
ipInUse = make(map[string]bool)
ipLastUsed = make(map[string]time.Time)
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", errors.Err(err)
}
for _, address := range addrs { type throttledIP struct {
if ipnet, ok := address.(*net.IPNet); ok && ipnet.IP.IsGlobalUnicast() { IP string
if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) { LastUse time.Time
ipv6Pool = append(ipv6Pool, ipnet.IP.String()) Throttled bool
ipLastUsed[ipnet.IP.String()] = time.Now().Add(-IPCooldownPeriod) InUse bool
} else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) { }
ipv4Pool = append(ipv4Pool, ipnet.IP.String())
ipLastUsed[ipnet.IP.String()] = time.Now().Add(-IPCooldownPeriod) var ipPoolInstance *IPPool
}
func GetIPPool() (*IPPool, error) {
if ipPoolInstance != nil {
return ipPoolInstance, nil
}
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, errors.Err(err)
}
var pool []throttledIP
ipv6Added := false
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && ipnet.IP.IsGlobalUnicast() {
if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) && !ipv6Added {
pool = append(pool, throttledIP{
IP: ipnet.IP.String(),
LastUse: time.Time{},
})
ipv6Added = true
} else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) {
pool = append(pool, throttledIP{
IP: ipnet.IP.String(),
LastUse: time.Time{},
})
} }
} }
} }
nextIP := "" ipPoolInstance = &IPPool{
if ipv6 { ips: pool,
nextIP = getLeastUsedIP(ipv6Pool) lock: &sync.Mutex{},
} else {
nextIP = getLeastUsedIP(ipv4Pool)
} }
if nextIP == "" { return ipPoolInstance, nil
return "throttled", errors.Err("all IPs are throttled")
}
lastUse := ipLastUsed[nextIP]
if time.Since(lastUse) < IPCooldownPeriod {
log.Debugf("The IP %s is too hot, waiting for %.1f seconds before continuing", nextIP, (IPCooldownPeriod - time.Since(lastUse)).Seconds())
time.Sleep(IPCooldownPeriod - time.Since(lastUse))
}
ipInUse[nextIP] = true
return nextIP, nil
} }
func ReleaseIP(ip string) { // AllThrottled checks whether the IPs provided are all throttled.
ipMutex.Lock() // returns false if at least one IP is not throttled
defer ipMutex.Unlock() // Not thread safe, should use locking when called
ipLastUsed[ip] = time.Now() func AllThrottled(ips []throttledIP) bool {
ipInUse[ip] = false for _, i := range ips {
} if !i.Throttled {
return false
func getLeastUsedIP(ipPool []string) string {
nextIP := ""
veryLastUse := time.Now()
for _, ip := range ipPool {
isThrottled := throttledIPs[ip]
if isThrottled {
continue
}
inUse := ipInUse[ip]
if inUse {
continue
}
lastUse := ipLastUsed[ip]
if lastUse.Before(veryLastUse) {
nextIP = ip
veryLastUse = lastUse
} }
} }
return nextIP return true
} }
func SetIpThrottled(ip string, stopGrp *stop.Group) { // AllInUse checks whether the IPs provided are all currently in use.
ipMutex.Lock() // returns false if at least one IP is not in use AND is not throttled
isThrottled := throttledIPs[ip] // Not thread safe, should use locking when called
if isThrottled { func AllInUse(ips []throttledIP) bool {
return for _, i := range ips {
if !i.InUse && !i.Throttled {
return false
}
}
return true
}
func (i *IPPool) ReleaseIP(ip string) {
i.lock.Lock()
defer i.lock.Unlock()
for j, _ := range i.ips {
localIP := &i.ips[j]
if localIP.IP == ip {
localIP.InUse = false
localIP.LastUse = time.Now()
break
}
}
}
func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) {
i.lock.Lock()
defer i.lock.Unlock()
var tIP *throttledIP
for j, _ := range i.ips {
localIP := &i.ips[j]
if localIP.IP == ip {
if localIP.Throttled {
return
}
localIP.Throttled = true
tIP = localIP
break
}
} }
throttledIPs[ip] = true
ipMutex.Unlock()
util.SendErrorToSlack("%s set to throttled", ip) util.SendErrorToSlack("%s set to throttled", ip)
stopper.Add(1) stopper.Add(1)
go func() { go func(tIP *throttledIP) {
defer stopper.Done() defer stopper.Done()
unbanTimer := time.NewTimer(unbanTimeout) unbanTimer := time.NewTimer(unbanTimeout)
select { select {
case <-unbanTimer.C: case <-unbanTimer.C:
ipMutex.Lock() i.lock.Lock()
throttledIPs[ip] = false tIP.Throttled = false
ipMutex.Unlock() i.lock.Unlock()
util.SendInfoToSlack("%s set back to not throttled", ip) util.SendInfoToSlack("%s set back to not throttled", ip)
case <-stopGrp.Ch(): case <-stopGrp.Ch():
unbanTimer.Stop() unbanTimer.Stop()
} }
}() }(tIP)
}
var ErrAllInUse = errors.Base("all IPs are in use, try again")
var ErrAllThrottled = errors.Base("all IPs are throttled")
var ErrResourceLock = errors.Base("error getting next ip, did you forget to lock on the resource?")
func (i *IPPool) nextIP() (*throttledIP, error) {
i.lock.Lock()
defer i.lock.Unlock()
sort.Slice(i.ips, func(j, k int) bool {
return i.ips[j].LastUse.Before(i.ips[j].LastUse)
})
if !AllThrottled(i.ips) {
if AllInUse(i.ips) {
return nil, errors.Err(ErrAllInUse)
}
var nextIP *throttledIP
for j, _ := range i.ips {
ip := &i.ips[j]
if ip.InUse || ip.Throttled {
continue
}
nextIP = ip
break
}
if nextIP == nil {
return nil, errors.Err(ErrResourceLock)
}
nextIP.InUse = true
return nextIP, nil
}
return nil, errors.Err(ErrAllThrottled)
}
func (i *IPPool) GetIP() (string, error) {
for {
ip, err := i.nextIP()
if err != nil {
if errors.Is(err, ErrAllInUse) {
time.Sleep(5 * time.Second)
continue
} else if errors.Is(err, ErrAllThrottled) {
return "throttled", err
}
return "", err
}
if time.Since(ip.LastUse) < IPCooldownPeriod {
log.Debugf("The IP %s is too hot, waiting for %.1f seconds before continuing", ip.IP, (IPCooldownPeriod - time.Since(ip.LastUse)).Seconds())
time.Sleep(IPCooldownPeriod - time.Since(ip.LastUse))
}
return ip.IP, nil
}
} }

View file

@ -0,0 +1,26 @@
package ip_manager
import (
"testing"
)
func TestAll(t *testing.T) {
pool, err := GetIPPool()
if err != nil {
t.Fatal(err)
}
for range pool.ips {
ip, err := pool.GetIP()
if err != nil {
t.Fatal(err)
}
t.Log(ip)
}
next, err := pool.nextIP()
if err != nil {
t.Logf("%s", err.Error())
} else {
t.Fatal(next)
}
}

View file

@ -14,6 +14,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/lbryio/ytsync/ip_manager"
"github.com/lbryio/ytsync/namer" "github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk" "github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources" "github.com/lbryio/ytsync/sources"
@ -963,6 +964,10 @@ func (s *Sync) enqueueYoutubeVideos() error {
} }
var videos []video var videos []video
ipPool, err := ip_manager.GetIPPool()
if err != nil {
return err
}
playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50) playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
nextPageToken := "" nextPageToken := ""
for { for {
@ -1000,7 +1005,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
return errors.Prefix("error getting videos info", err) return errors.Prefix("error getting videos info", err)
} }
for _, item := range videosListResponse.Items { for _, item := range videosListResponse.Items {
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig(), s.grp)) videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig(), s.grp, ipPool))
} }
log.Infof("Got info for %d videos from youtube API", len(videos)) log.Infof("Got info for %d videos from youtube API", len(videos))
@ -1016,7 +1021,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
} }
_, ok := playlistMap[k] _, ok := playlistMap[k]
if !ok { if !ok {
videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig(), s.grp)) videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig(), s.grp, ipPool))
} }
} }

View file

@ -50,6 +50,7 @@ type YoutubeVideo struct {
mocked bool mocked bool
walletLock *sync.RWMutex walletLock *sync.RWMutex
stopGroup *stop.Group stopGroup *stop.Group
pool *ip_manager.IPPool
} }
var youtubeCategories = map[string]string{ var youtubeCategories = map[string]string{
@ -87,7 +88,7 @@ var youtubeCategories = map[string]string{
"44": "trailers", "44": "trailers",
} }
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo { func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors
return &YoutubeVideo{ return &YoutubeVideo{
id: videoData.Id, id: videoData.Id,
@ -101,9 +102,10 @@ func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPositio
mocked: false, mocked: false,
youtubeChannelID: videoData.Snippet.ChannelId, youtubeChannelID: videoData.Snippet.ChannelId,
stopGroup: stopGroup, stopGroup: stopGroup,
pool: pool,
} }
} }
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo { func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
return &YoutubeVideo{ return &YoutubeVideo{
id: videoID, id: videoID,
playlistPosition: 0, playlistPosition: 0,
@ -112,6 +114,7 @@ func NewMockedVideo(directory string, videoID string, youtubeChannelID string, a
mocked: true, mocked: true,
youtubeChannelID: youtubeChannelID, youtubeChannelID: youtubeChannelID,
stopGroup: stopGroup, stopGroup: stopGroup,
pool: pool,
} }
} }
@ -220,7 +223,8 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
fmt.Sprintf("duration <= %d", int(math.Round(v.maxVideoLength*3600))), fmt.Sprintf("duration <= %d", int(math.Round(v.maxVideoLength*3600))),
) )
} }
sourceAddress, err := ip_manager.GetNextIP(useIPv6)
sourceAddress, err := v.pool.GetIP()
if err != nil { if err != nil {
if sourceAddress == "throttled" { if sourceAddress == "throttled" {
for { for {
@ -231,8 +235,8 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
} }
time.Sleep(ip_manager.IPCooldownPeriod) time.Sleep(ip_manager.IPCooldownPeriod)
sourceAddress, err = ip_manager.GetNextIP(useIPv6) sourceAddress, err = v.pool.GetIP()
if err == nil { if err == nil { //TODO: This is possibly not 100% right, but it works so I'm not touching it...
break break
} }
} }
@ -240,23 +244,13 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
return errors.Err(err) return errors.Err(err)
} }
} }
defer ip_manager.ReleaseIP(sourceAddress) defer v.pool.ReleaseIP(sourceAddress)
if useIPv6 { ytdlArgs = append(ytdlArgs,
log.Infof("using IPv6: %s", sourceAddress) "--source-address",
ytdlArgs = append(ytdlArgs, sourceAddress,
"-6", "https://www.youtube.com/watch?v="+v.ID(),
"--source-address", )
sourceAddress,
)
} else {
log.Infof("using IPv4: %s", sourceAddress)
ytdlArgs = append(ytdlArgs,
"-4",
"--source-address",
sourceAddress,
)
}
ytdlArgs = append(ytdlArgs, "https://www.youtube.com/watch?v="+v.ID())
runcmd: runcmd:
argsWithFilters := append(ytdlArgs, "-fbestvideo[ext=mp4][height<="+qualities[qualityIndex]+"]+bestaudio[ext!=webm]") argsWithFilters := append(ytdlArgs, "-fbestvideo[ext=mp4][height<="+qualities[qualityIndex]+"]+bestaudio[ext!=webm]")
cmd := exec.Command("youtube-dl", argsWithFilters...) cmd := exec.Command("youtube-dl", argsWithFilters...)
@ -282,7 +276,7 @@ runcmd:
if err = cmd.Wait(); err != nil { if err = cmd.Wait(); err != nil {
if strings.Contains(err.Error(), "exit status 1") { if strings.Contains(err.Error(), "exit status 1") {
if strings.Contains(string(errorLog), "HTTP Error 429") { if strings.Contains(string(errorLog), "HTTP Error 429") {
ip_manager.SetIpThrottled(sourceAddress, v.stopGroup) v.pool.SetThrottled(sourceAddress, v.stopGroup)
} else if strings.Contains(string(errorLog), "giving up after 0 fragment retries") && qualityIndex < len(qualities)-1 { } else if strings.Contains(string(errorLog), "giving up after 0 fragment retries") && qualityIndex < len(qualities)-1 {
qualityIndex++ qualityIndex++
goto runcmd goto runcmd