add ip throttling and better pooling
fix nil pointer dereference
This commit is contained in:
parent
fa24d83ae9
commit
ae1ffb60c5
2 changed files with 84 additions and 19 deletions
|
@ -54,7 +54,7 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC
|
||||||
Data []YoutubeChannel `json:"data"`
|
Data []YoutubeChannel `json:"data"`
|
||||||
}
|
}
|
||||||
endpoint := a.ApiURL + "/yt/jobs"
|
endpoint := a.ApiURL + "/yt/jobs"
|
||||||
res, _ := http.PostForm(endpoint, url.Values{
|
res, err := http.PostForm(endpoint, url.Values{
|
||||||
"auth_token": {a.ApiToken},
|
"auth_token": {a.ApiToken},
|
||||||
"sync_status": {status},
|
"sync_status": {status},
|
||||||
"min_videos": {strconv.Itoa(1)},
|
"min_videos": {strconv.Itoa(1)},
|
||||||
|
@ -63,10 +63,13 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC
|
||||||
"sync_server": {a.HostName},
|
"sync_server": {a.HostName},
|
||||||
"channel_id": {cp.YoutubeChannelID},
|
"channel_id": {cp.YoutubeChannelID},
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Err(err)
|
||||||
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
var response apiJobsResponse
|
var response apiJobsResponse
|
||||||
err := json.Unmarshal(body, &response)
|
err = json.Unmarshal(body, &response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/extras/errors"
|
"github.com/lbryio/lbry.go/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/extras/jsonrpc"
|
"github.com/lbryio/lbry.go/extras/jsonrpc"
|
||||||
|
"github.com/lbryio/lbry.go/extras/stop"
|
||||||
"github.com/lbryio/lbry.go/extras/util"
|
"github.com/lbryio/lbry.go/extras/util"
|
||||||
|
|
||||||
"github.com/lbryio/ytsync/namer"
|
"github.com/lbryio/ytsync/namer"
|
||||||
|
@ -51,7 +52,8 @@ type YoutubeVideo struct {
|
||||||
walletLock *sync.RWMutex
|
walletLock *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
const reflectorURL = "http://blobs.lbry.io/"
|
const IPCooldownPeriod = 20 * time.Second
|
||||||
|
const unbanTimeout = 3 * time.Hour
|
||||||
|
|
||||||
var youtubeCategories = map[string]string{
|
var youtubeCategories = map[string]string{
|
||||||
"1": "film & animation",
|
"1": "film & animation",
|
||||||
|
@ -174,15 +176,16 @@ func (v *YoutubeVideo) getAbbrevDescription() string {
|
||||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription
|
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription
|
||||||
}
|
}
|
||||||
|
|
||||||
var ipPool []string
|
var ipv6Pool []string
|
||||||
var IPIndex int
|
var ipv4Pool []string
|
||||||
|
var throttledIPs map[string]bool
|
||||||
|
var ipLastUsed map[string]time.Time
|
||||||
var ipMutex sync.Mutex
|
var ipMutex sync.Mutex
|
||||||
|
|
||||||
func getNextIP() (string, error) {
|
func getNextIP(ipv6 bool) (string, error) {
|
||||||
ipMutex.Lock()
|
ipMutex.Lock()
|
||||||
defer ipMutex.Unlock()
|
defer ipMutex.Unlock()
|
||||||
if len(ipPool) < 1 {
|
if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 {
|
||||||
IPIndex = 0
|
|
||||||
addrs, err := net.InterfaceAddrs()
|
addrs, err := net.InterfaceAddrs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errors.Err(err)
|
return "", errors.Err(err)
|
||||||
|
@ -191,19 +194,74 @@ func getNextIP() (string, error) {
|
||||||
for _, address := range addrs {
|
for _, address := range addrs {
|
||||||
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||||||
if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) {
|
if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) {
|
||||||
ipPool = append(ipPool, ipnet.IP.String())
|
ipv6Pool = append(ipv6Pool, ipnet.IP.String())
|
||||||
|
} else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) {
|
||||||
|
ipv4Pool = append(ipv4Pool, ipnet.IP.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nextIP := ipPool[IPIndex]
|
nextIP := ""
|
||||||
if IPIndex == len(ipPool)-1 {
|
if ipv6 {
|
||||||
IPIndex = 0
|
nextIP = getLeastUsedIP(ipv6Pool)
|
||||||
} else {
|
} else {
|
||||||
IPIndex++
|
nextIP = getLeastUsedIP(ipv4Pool)
|
||||||
}
|
}
|
||||||
|
if nextIP == "" {
|
||||||
|
return "", errors.Err("all IPs are throttled")
|
||||||
|
}
|
||||||
|
lastUse := ipLastUsed[nextIP]
|
||||||
|
if time.Since(lastUse) < IPCooldownPeriod {
|
||||||
|
time.Sleep(IPCooldownPeriod - time.Since(lastUse))
|
||||||
|
}
|
||||||
|
|
||||||
|
ipLastUsed[nextIP] = time.Now()
|
||||||
return nextIP, nil
|
return nextIP, nil
|
||||||
}
|
}
|
||||||
|
func getLeastUsedIP(ipPool []string) string {
|
||||||
|
nextIP := ""
|
||||||
|
veryLastUse := time.Now()
|
||||||
|
for _, ip := range ipPool {
|
||||||
|
isThrottled, _ := throttledIPs[ip]
|
||||||
|
if isThrottled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lastUse := ipLastUsed[ip]
|
||||||
|
if lastUse.Before(veryLastUse) {
|
||||||
|
nextIP = ip
|
||||||
|
veryLastUse = lastUse
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nextIP
|
||||||
|
}
|
||||||
|
|
||||||
|
func setIpThrottled(ip string, stopGrp *stop.Group) {
|
||||||
|
ipMutex.Lock()
|
||||||
|
defer ipMutex.Unlock()
|
||||||
|
isThrottled, _ := throttledIPs[ip]
|
||||||
|
if isThrottled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
throttledIPs[ip] = true
|
||||||
|
log.Printf("%s set to throttled", ip)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
unbanTimer := time.NewTimer(unbanTimeout)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-unbanTimer.C:
|
||||||
|
throttledIPs[ip] = false
|
||||||
|
log.Printf("%s set back to not throttled", ip)
|
||||||
|
return
|
||||||
|
case <-stopGrp.Ch():
|
||||||
|
unbanTimer.Stop()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func (v *YoutubeVideo) download(useIPv6 bool) error {
|
func (v *YoutubeVideo) download(useIPv6 bool) error {
|
||||||
videoPath := v.getFullPath()
|
videoPath := v.getFullPath()
|
||||||
|
@ -232,11 +290,11 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
|
||||||
"--merge-output-format",
|
"--merge-output-format",
|
||||||
"mp4",
|
"mp4",
|
||||||
}
|
}
|
||||||
|
sourceAddress, err := getNextIP(useIPv6)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
if useIPv6 {
|
if useIPv6 {
|
||||||
sourceAddress, err := getNextIP()
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
log.Infof("using IPv6: %s", sourceAddress)
|
log.Infof("using IPv6: %s", sourceAddress)
|
||||||
ytdlArgs = append(ytdlArgs,
|
ytdlArgs = append(ytdlArgs,
|
||||||
"-6",
|
"-6",
|
||||||
|
@ -244,8 +302,12 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
|
||||||
sourceAddress,
|
sourceAddress,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
log.Infoln("using IPv4")
|
log.Infoln("using IPv4: %s", sourceAddress)
|
||||||
ytdlArgs = append(ytdlArgs, "-4")
|
ytdlArgs = append(ytdlArgs,
|
||||||
|
"-4",
|
||||||
|
"--source-address",
|
||||||
|
sourceAddress,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
ytdlArgs = append(ytdlArgs, "https://www.youtube.com/watch?v="+v.ID())
|
ytdlArgs = append(ytdlArgs, "https://www.youtube.com/watch?v="+v.ID())
|
||||||
cmd := exec.Command("youtube-dl", ytdlArgs...)
|
cmd := exec.Command("youtube-dl", ytdlArgs...)
|
||||||
|
|
Loading…
Add table
Reference in a new issue