better handle interruptions by user
refactor IP throttling in its own package
This commit is contained in:
parent
ae1ffb60c5
commit
a3fcd67611
3 changed files with 142 additions and 98 deletions
111
ipManager/throttle.go
Normal file
111
ipManager/throttle.go
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
package ipManager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/asaskevich/govalidator"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/extras/errors"
|
||||||
|
"github.com/lbryio/lbry.go/extras/stop"
|
||||||
|
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const IPCooldownPeriod = 20 * time.Second
|
||||||
|
const unbanTimeout = 3 * time.Hour
|
||||||
|
|
||||||
|
var ipv6Pool []string
|
||||||
|
var ipv4Pool []string
|
||||||
|
var throttledIPs map[string]bool
|
||||||
|
var ipLastUsed map[string]time.Time
|
||||||
|
var ipMutex sync.Mutex
|
||||||
|
var stopper = stop.New()
|
||||||
|
|
||||||
|
func SignalShutdown() {
|
||||||
|
stopper.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetNextIP(ipv6 bool) (string, error) {
|
||||||
|
ipMutex.Lock()
|
||||||
|
defer ipMutex.Unlock()
|
||||||
|
if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 {
|
||||||
|
addrs, err := net.InterfaceAddrs()
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, address := range addrs {
|
||||||
|
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||||||
|
if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) {
|
||||||
|
ipv6Pool = append(ipv6Pool, ipnet.IP.String())
|
||||||
|
} else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) {
|
||||||
|
ipv4Pool = append(ipv4Pool, ipnet.IP.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nextIP := ""
|
||||||
|
if ipv6 {
|
||||||
|
nextIP = getLeastUsedIP(ipv6Pool)
|
||||||
|
} else {
|
||||||
|
nextIP = getLeastUsedIP(ipv4Pool)
|
||||||
|
}
|
||||||
|
if nextIP == "" {
|
||||||
|
return "throttled", errors.Err("all IPs are throttled")
|
||||||
|
}
|
||||||
|
lastUse := ipLastUsed[nextIP]
|
||||||
|
if time.Since(lastUse) < IPCooldownPeriod {
|
||||||
|
time.Sleep(IPCooldownPeriod - time.Since(lastUse))
|
||||||
|
}
|
||||||
|
|
||||||
|
ipLastUsed[nextIP] = time.Now()
|
||||||
|
return nextIP, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLeastUsedIP(ipPool []string) string {
|
||||||
|
nextIP := ""
|
||||||
|
veryLastUse := time.Now()
|
||||||
|
for _, ip := range ipPool {
|
||||||
|
isThrottled, _ := throttledIPs[ip]
|
||||||
|
if isThrottled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lastUse := ipLastUsed[ip]
|
||||||
|
if lastUse.Before(veryLastUse) {
|
||||||
|
nextIP = ip
|
||||||
|
veryLastUse = lastUse
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nextIP
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetIpThrottled(ip string, stopGrp *stop.Group) {
|
||||||
|
ipMutex.Lock()
|
||||||
|
defer ipMutex.Unlock()
|
||||||
|
isThrottled, _ := throttledIPs[ip]
|
||||||
|
if isThrottled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
throttledIPs[ip] = true
|
||||||
|
log.Printf("%s set to throttled", ip)
|
||||||
|
|
||||||
|
stopper.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer stopper.Done()
|
||||||
|
unbanTimer := time.NewTimer(unbanTimeout)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-unbanTimer.C:
|
||||||
|
throttledIPs[ip] = false
|
||||||
|
log.Printf("%s set back to not throttled", ip)
|
||||||
|
return
|
||||||
|
case <-stopGrp.Ch():
|
||||||
|
unbanTimer.Stop()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
|
@ -649,6 +649,9 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error())
|
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error())
|
||||||
log.Errorln(logMsg)
|
log.Errorln(logMsg)
|
||||||
|
if strings.Contains(err.Error(), "interrupted by user") {
|
||||||
|
return
|
||||||
|
}
|
||||||
fatalErrors := []string{
|
fatalErrors := []string{
|
||||||
":5279: read: connection reset by peer",
|
":5279: read: connection reset by peer",
|
||||||
"no space left on device",
|
"no space left on device",
|
||||||
|
@ -811,7 +814,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
|
||||||
return errors.Prefix("error getting videos info", err)
|
return errors.Prefix("error getting videos info", err)
|
||||||
}
|
}
|
||||||
for _, item := range videosListResponse.Items {
|
for _, item := range videosListResponse.Items {
|
||||||
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig()))
|
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig(), s.grp))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Got info for %d videos from youtube API", len(videos))
|
log.Infof("Got info for %d videos from youtube API", len(videos))
|
||||||
|
@ -827,7 +830,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
|
||||||
}
|
}
|
||||||
_, ok := playlistMap[k]
|
_, ok := playlistMap[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig()))
|
videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig(), s.grp))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -19,13 +18,13 @@ import (
|
||||||
"github.com/lbryio/lbry.go/extras/stop"
|
"github.com/lbryio/lbry.go/extras/stop"
|
||||||
"github.com/lbryio/lbry.go/extras/util"
|
"github.com/lbryio/lbry.go/extras/util"
|
||||||
|
|
||||||
|
"github.com/lbryio/ytsync/ipManager"
|
||||||
"github.com/lbryio/ytsync/namer"
|
"github.com/lbryio/ytsync/namer"
|
||||||
"github.com/lbryio/ytsync/sdk"
|
"github.com/lbryio/ytsync/sdk"
|
||||||
"github.com/lbryio/ytsync/tagsManager"
|
"github.com/lbryio/ytsync/tagsManager"
|
||||||
"github.com/lbryio/ytsync/thumbs"
|
"github.com/lbryio/ytsync/thumbs"
|
||||||
|
|
||||||
"github.com/ChannelMeter/iso8601duration"
|
"github.com/ChannelMeter/iso8601duration"
|
||||||
"github.com/asaskevich/govalidator"
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/shopspring/decimal"
|
"github.com/shopspring/decimal"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -50,11 +49,9 @@ type YoutubeVideo struct {
|
||||||
lbryChannelID string
|
lbryChannelID string
|
||||||
mocked bool
|
mocked bool
|
||||||
walletLock *sync.RWMutex
|
walletLock *sync.RWMutex
|
||||||
|
stopGroup *stop.Group
|
||||||
}
|
}
|
||||||
|
|
||||||
const IPCooldownPeriod = 20 * time.Second
|
|
||||||
const unbanTimeout = 3 * time.Hour
|
|
||||||
|
|
||||||
var youtubeCategories = map[string]string{
|
var youtubeCategories = map[string]string{
|
||||||
"1": "film & animation",
|
"1": "film & animation",
|
||||||
"2": "autos & vehicles",
|
"2": "autos & vehicles",
|
||||||
|
@ -90,7 +87,7 @@ var youtubeCategories = map[string]string{
|
||||||
"44": "trailers",
|
"44": "trailers",
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config) *YoutubeVideo {
|
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo {
|
||||||
publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors
|
publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors
|
||||||
return &YoutubeVideo{
|
return &YoutubeVideo{
|
||||||
id: videoData.Id,
|
id: videoData.Id,
|
||||||
|
@ -103,9 +100,10 @@ func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPositio
|
||||||
awsConfig: awsConfig,
|
awsConfig: awsConfig,
|
||||||
mocked: false,
|
mocked: false,
|
||||||
youtubeChannelID: videoData.Snippet.ChannelId,
|
youtubeChannelID: videoData.Snippet.ChannelId,
|
||||||
|
stopGroup: stopGroup,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config) *YoutubeVideo {
|
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group) *YoutubeVideo {
|
||||||
return &YoutubeVideo{
|
return &YoutubeVideo{
|
||||||
id: videoID,
|
id: videoID,
|
||||||
playlistPosition: 0,
|
playlistPosition: 0,
|
||||||
|
@ -113,6 +111,7 @@ func NewMockedVideo(directory string, videoID string, youtubeChannelID string, a
|
||||||
awsConfig: awsConfig,
|
awsConfig: awsConfig,
|
||||||
mocked: true,
|
mocked: true,
|
||||||
youtubeChannelID: youtubeChannelID,
|
youtubeChannelID: youtubeChannelID,
|
||||||
|
stopGroup: stopGroup,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,93 +175,6 @@ func (v *YoutubeVideo) getAbbrevDescription() string {
|
||||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription
|
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription
|
||||||
}
|
}
|
||||||
|
|
||||||
var ipv6Pool []string
|
|
||||||
var ipv4Pool []string
|
|
||||||
var throttledIPs map[string]bool
|
|
||||||
var ipLastUsed map[string]time.Time
|
|
||||||
var ipMutex sync.Mutex
|
|
||||||
|
|
||||||
func getNextIP(ipv6 bool) (string, error) {
|
|
||||||
ipMutex.Lock()
|
|
||||||
defer ipMutex.Unlock()
|
|
||||||
if len(ipv4Pool) < 1 || len(ipv6Pool) < 1 {
|
|
||||||
addrs, err := net.InterfaceAddrs()
|
|
||||||
if err != nil {
|
|
||||||
return "", errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, address := range addrs {
|
|
||||||
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
|
||||||
if ipnet.IP.To16() != nil && govalidator.IsIPv6(ipnet.IP.String()) {
|
|
||||||
ipv6Pool = append(ipv6Pool, ipnet.IP.String())
|
|
||||||
} else if ipnet.IP.To4() != nil && govalidator.IsIPv4(ipnet.IP.String()) {
|
|
||||||
ipv4Pool = append(ipv4Pool, ipnet.IP.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nextIP := ""
|
|
||||||
if ipv6 {
|
|
||||||
nextIP = getLeastUsedIP(ipv6Pool)
|
|
||||||
} else {
|
|
||||||
nextIP = getLeastUsedIP(ipv4Pool)
|
|
||||||
}
|
|
||||||
if nextIP == "" {
|
|
||||||
return "", errors.Err("all IPs are throttled")
|
|
||||||
}
|
|
||||||
lastUse := ipLastUsed[nextIP]
|
|
||||||
if time.Since(lastUse) < IPCooldownPeriod {
|
|
||||||
time.Sleep(IPCooldownPeriod - time.Since(lastUse))
|
|
||||||
}
|
|
||||||
|
|
||||||
ipLastUsed[nextIP] = time.Now()
|
|
||||||
return nextIP, nil
|
|
||||||
}
|
|
||||||
func getLeastUsedIP(ipPool []string) string {
|
|
||||||
nextIP := ""
|
|
||||||
veryLastUse := time.Now()
|
|
||||||
for _, ip := range ipPool {
|
|
||||||
isThrottled, _ := throttledIPs[ip]
|
|
||||||
if isThrottled {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
lastUse := ipLastUsed[ip]
|
|
||||||
if lastUse.Before(veryLastUse) {
|
|
||||||
nextIP = ip
|
|
||||||
veryLastUse = lastUse
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nextIP
|
|
||||||
}
|
|
||||||
|
|
||||||
func setIpThrottled(ip string, stopGrp *stop.Group) {
|
|
||||||
ipMutex.Lock()
|
|
||||||
defer ipMutex.Unlock()
|
|
||||||
isThrottled, _ := throttledIPs[ip]
|
|
||||||
if isThrottled {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
throttledIPs[ip] = true
|
|
||||||
log.Printf("%s set to throttled", ip)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
unbanTimer := time.NewTimer(unbanTimeout)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-unbanTimer.C:
|
|
||||||
throttledIPs[ip] = false
|
|
||||||
log.Printf("%s set back to not throttled", ip)
|
|
||||||
return
|
|
||||||
case <-stopGrp.Ch():
|
|
||||||
unbanTimer.Stop()
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *YoutubeVideo) download(useIPv6 bool) error {
|
func (v *YoutubeVideo) download(useIPv6 bool) error {
|
||||||
videoPath := v.getFullPath()
|
videoPath := v.getFullPath()
|
||||||
|
|
||||||
|
@ -290,9 +202,24 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
|
||||||
"--merge-output-format",
|
"--merge-output-format",
|
||||||
"mp4",
|
"mp4",
|
||||||
}
|
}
|
||||||
sourceAddress, err := getNextIP(useIPv6)
|
sourceAddress, err := ipManager.GetNextIP(useIPv6)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
if sourceAddress == "throttled" {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-v.stopGroup.Ch():
|
||||||
|
return errors.Err("interrupted by user")
|
||||||
|
default:
|
||||||
|
time.Sleep(20 * time.Second)
|
||||||
|
sourceAddress, err = ipManager.GetNextIP(useIPv6)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if useIPv6 {
|
if useIPv6 {
|
||||||
log.Infof("using IPv6: %s", sourceAddress)
|
log.Infof("using IPv6: %s", sourceAddress)
|
||||||
|
@ -332,6 +259,9 @@ func (v *YoutubeVideo) download(useIPv6 bool) error {
|
||||||
|
|
||||||
if err = cmd.Wait(); err != nil {
|
if err = cmd.Wait(); err != nil {
|
||||||
if strings.Contains(err.Error(), "exit status 1") {
|
if strings.Contains(err.Error(), "exit status 1") {
|
||||||
|
if strings.Contains(string(errorLog), "HTTP Error 429") {
|
||||||
|
ipManager.SetIpThrottled(sourceAddress, v.stopGroup)
|
||||||
|
}
|
||||||
return errors.Err(string(errorLog))
|
return errors.Err(string(errorLog))
|
||||||
}
|
}
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
|
|
Loading…
Add table
Reference in a new issue