From ae1ffb60c5c7e0aef5d8c5626923ad357236f4e3 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 12 Jul 2019 20:42:44 +0200 Subject: [PATCH] add ip throttling and better pooling fix nil pointer dereference --- sdk/api.go | 7 ++- sources/youtubeVideo.go | 96 +++++++++++++++++++++++++++++++++-------- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/sdk/api.go b/sdk/api.go index d8c7d47..821f8a8 100644 --- a/sdk/api.go +++ b/sdk/api.go @@ -54,7 +54,7 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC Data []YoutubeChannel `json:"data"` } endpoint := a.ApiURL + "/yt/jobs" - res, _ := http.PostForm(endpoint, url.Values{ + res, err := http.PostForm(endpoint, url.Values{ "auth_token": {a.ApiToken}, "sync_status": {status}, "min_videos": {strconv.Itoa(1)}, @@ -63,10 +63,13 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC "sync_server": {a.HostName}, "channel_id": {cp.YoutubeChannelID}, }) + if err != nil { + return nil, errors.Err(err) + } defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) var response apiJobsResponse - err := json.Unmarshal(body, &response) + err = json.Unmarshal(body, &response) if err != nil { return nil, err } diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 9dd258d..22dfbb1 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -16,6 +16,7 @@ import ( "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/jsonrpc" + "github.com/lbryio/lbry.go/extras/stop" "github.com/lbryio/lbry.go/extras/util" "github.com/lbryio/ytsync/namer" @@ -51,7 +52,8 @@ type YoutubeVideo struct { walletLock *sync.RWMutex } -const reflectorURL = "http://blobs.lbry.io/" +const IPCooldownPeriod = 20 * time.Second +const unbanTimeout = 3 * time.Hour var youtubeCategories = map[string]string{ "1": "film & animation", @@ -174,15 +176,16 @@ func (v *YoutubeVideo) getAbbrevDescription() string { return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription } -var ipPool []string -var IPIndex int +var ipv6Pool []string +var ipv4Pool []string +var throttledIPs map[string]bool +var ipLastUsed map[string]time.Time var ipMutex sync.Mutex -func getNextIP() (string, error) { +func getNextIP(ipv6 bool) (string, error) { ipMutex.Lock() defer ipMutex.Unlock() - if len(ipPool) < 1 { - IPIndex = 0 + if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 { addrs, err := net.InterfaceAddrs() if err != nil { return "", errors.Err(err) @@ -191,19 +194,74 @@ func getNextIP() (string, error) { for _, address := range addrs { if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) { - ipPool = append(ipPool, ipnet.IP.String()) + ipv6Pool = append(ipv6Pool, ipnet.IP.String()) + } else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) { + ipv4Pool = append(ipv4Pool, ipnet.IP.String()) } } } } - nextIP := ipPool[IPIndex] - if IPIndex == len(ipPool)-1 { - IPIndex = 0 + nextIP := "" + if ipv6 { + nextIP = getLeastUsedIP(ipv6Pool) } else { - IPIndex++ + nextIP = getLeastUsedIP(ipv4Pool) } + if nextIP == "" { + return "", errors.Err("all IPs are throttled") + } + lastUse := ipLastUsed[nextIP] + if time.Since(lastUse) < IPCooldownPeriod { + time.Sleep(IPCooldownPeriod - time.Since(lastUse)) + } + + ipLastUsed[nextIP] = time.Now() return nextIP, nil } +func getLeastUsedIP(ipPool []string) string { + nextIP := "" + veryLastUse := time.Now() + for _, ip := range ipPool { + isThrottled, _ := throttledIPs[ip] + if isThrottled { + continue + } + lastUse := ipLastUsed[ip] + if lastUse.Before(veryLastUse) { + nextIP = ip + veryLastUse = lastUse + } + } + return nextIP +} + +func setIpThrottled(ip string, stopGrp *stop.Group) { + ipMutex.Lock() + defer ipMutex.Unlock() + isThrottled, _ := throttledIPs[ip] + if isThrottled { + return + } + throttledIPs[ip] = true + log.Printf("%s set to throttled", ip) + + go func() { + unbanTimer := time.NewTimer(unbanTimeout) + for { + select { + case <-unbanTimer.C: + throttledIPs[ip] = false + log.Printf("%s set back to not throttled", ip) + return + case <-stopGrp.Ch(): + unbanTimer.Stop() + return + default: + time.Sleep(5 * time.Second) + } + } + }() +} func (v *YoutubeVideo) download(useIPv6 bool) error { videoPath := v.getFullPath() @@ -232,11 +290,11 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { "--merge-output-format", "mp4", } + sourceAddress, err := getNextIP(useIPv6) + if err != nil { + return errors.Err(err) + } if useIPv6 { - sourceAddress, err := getNextIP() - if err != nil { - return errors.Err(err) - } log.Infof("using IPv6: %s", sourceAddress) ytdlArgs = append(ytdlArgs, "-6", @@ -244,8 +302,12 @@ func (v *YoutubeVideo) download(useIPv6 bool) error { sourceAddress, ) } else { - log.Infoln("using IPv4") - ytdlArgs = append(ytdlArgs, "-4") + log.Infoln("using IPv4: %s", sourceAddress) + ytdlArgs = append(ytdlArgs, + "-4", + "--source-address", + sourceAddress, + ) } ytdlArgs = append(ytdlArgs, "https://www.youtube.com/watch?v="+v.ID()) cmd := exec.Command("youtube-dl", ytdlArgs...)