From 54d04dcf2c0cfb1702ecf36597202f1a5872c525 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 18 Dec 2019 18:22:15 +0100 Subject: [PATCH] add IP debugging improve process halting rename improper language code for hebrew --- ip_manager/throttle.go | 57 +++++++++++++++++++++++++++++------------ manager/setup.go | 3 +++ manager/ytsync.go | 3 ++- sources/youtubeVideo.go | 9 ++++--- 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/ip_manager/throttle.go b/ip_manager/throttle.go index b8251f1..1b61b95 100644 --- a/ip_manager/throttle.go +++ b/ip_manager/throttle.go @@ -19,20 +19,22 @@ const unbanTimeout = 3 * time.Hour var stopper = stop.New() type IPPool struct { - ips []throttledIP - lock *sync.Mutex + ips []throttledIP + lock *sync.RWMutex + stopGrp *stop.Group } type throttledIP struct { - IP string - LastUse time.Time - Throttled bool - InUse bool + IP string + UsedForVideo string + LastUse time.Time + Throttled bool + InUse bool } var ipPoolInstance *IPPool -func GetIPPool() (*IPPool, error) { +func GetIPPool(stopGrp *stop.Group) (*IPPool, error) { if ipPoolInstance != nil { return ipPoolInstance, nil } @@ -59,9 +61,25 @@ func GetIPPool() (*IPPool, error) { } } ipPoolInstance = &IPPool{ - ips: pool, - lock: &sync.Mutex{}, + ips: pool, + lock: &sync.RWMutex{}, + stopGrp: stopGrp, } + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-stopGrp.Ch(): + return + case <-ticker.C: + ipPoolInstance.lock.RLock() + for _, ip := range ipPoolInstance.ips { + log.Debugf("IP: %s\tInUse: %t\tVideoID: %s\tThrottled: %t\tLastUse: %.1f", ip.IP, ip.InUse, ip.UsedForVideo, ip.Throttled, time.Since(ip.LastUse).Seconds()) + } + ipPoolInstance.lock.RUnlock() + } + } + }() return ipPoolInstance, nil } @@ -102,7 +120,7 @@ func (i *IPPool) ReleaseIP(ip string) { } } -func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) { +func (i *IPPool) SetThrottled(ip string) { i.lock.Lock() defer i.lock.Unlock() var tIP *throttledIP @@ -129,7 +147,7 @@ func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) { tIP.Throttled = false i.lock.Unlock() util.SendInfoToSlack("%s set back to not throttled", ip) - case <-stopGrp.Ch(): + case <-i.stopGrp.Ch(): unbanTimer.Stop() } }(tIP) @@ -138,8 +156,9 @@ func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) { var ErrAllInUse = errors.Base("all IPs are in use, try again") var ErrAllThrottled = errors.Base("all IPs are throttled") var ErrResourceLock = errors.Base("error getting next ip, did you forget to lock on the resource?") +var ErrInterruptedByUser = errors.Base("interrupted by user") -func (i *IPPool) nextIP() (*throttledIP, error) { +func (i *IPPool) nextIP(forVideo string) (*throttledIP, error) { i.lock.Lock() defer i.lock.Unlock() @@ -165,18 +184,24 @@ func (i *IPPool) nextIP() (*throttledIP, error) { return nil, errors.Err(ErrResourceLock) } nextIP.InUse = true + nextIP.UsedForVideo = forVideo return nextIP, nil } return nil, errors.Err(ErrAllThrottled) } -func (i *IPPool) GetIP() (string, error) { +func (i *IPPool) GetIP(forVideo string) (string, error) { for { - ip, err := i.nextIP() + ip, err := i.nextIP(forVideo) if err != nil { if errors.Is(err, ErrAllInUse) { - time.Sleep(5 * time.Second) - continue + select { + case <-i.stopGrp.Ch(): + return "", errors.Err(ErrInterruptedByUser) + default: + time.Sleep(5 * time.Second) + continue + } } else if errors.Is(err, ErrAllThrottled) { return "throttled", err } diff --git a/manager/setup.go b/manager/setup.go index 5fa6dd9..82f0ad3 100644 --- a/manager/setup.go +++ b/manager/setup.go @@ -399,6 +399,9 @@ func (s *Sync) ensureChannelOwnership() error { var languages []string = nil if channelInfo.DefaultLanguage != "" { + if channelInfo.DefaultLanguage == "iw" { + channelInfo.DefaultLanguage = "he" + } languages = []string{channelInfo.DefaultLanguage} } var locations []jsonrpc.Location = nil diff --git a/manager/ytsync.go b/manager/ytsync.go index ac96bd4..077b0c0 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -873,6 +873,7 @@ func (s *Sync) startWorker(workerNum int) { "Sorry about that", "This video is not available", "requested format not available", + "interrupted by user", } if util.SubstringInSlice(err.Error(), errorsNoRetry) { log.Println("This error should not be retried at all") @@ -975,7 +976,7 @@ func (s *Sync) enqueueYoutubeVideos() error { } var videos []video - ipPool, err := ip_manager.GetIPPool() + ipPool, err := ip_manager.GetIPPool(s.grp) if err != nil { return err } diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 2135a59..0fc78d2 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -224,7 +224,7 @@ func (v *YoutubeVideo) download() error { ) } - sourceAddress, err := v.pool.GetIP() + sourceAddress, err := v.pool.GetIP(v.id) if err != nil { if sourceAddress == "throttled" { for { @@ -235,7 +235,7 @@ func (v *YoutubeVideo) download() error { } time.Sleep(ip_manager.IPCooldownPeriod) - sourceAddress, err = v.pool.GetIP() + sourceAddress, err = v.pool.GetIP(v.id) if err == nil { break } else if !errors.Is(err, ip_manager.ErrAllThrottled) { @@ -278,7 +278,7 @@ runcmd: if err = cmd.Wait(); err != nil { if strings.Contains(err.Error(), "exit status 1") { if strings.Contains(string(errorLog), "HTTP Error 429") || strings.Contains(string(errorLog), "returned non-zero exit status 8") { - v.pool.SetThrottled(sourceAddress, v.stopGroup) + v.pool.SetThrottled(sourceAddress) } else if strings.Contains(string(errorLog), "giving up after 0 fragment retries") && qualityIndex < len(qualities)-1 { qualityIndex++ goto runcmd //this bypasses the yt throttling IP redistribution... TODO: don't @@ -459,6 +459,9 @@ func (v *YoutubeVideo) getMetadata() (languages []string, locations []jsonrpc.Lo tags = nil if !v.mocked { if v.youtubeInfo.Snippet.DefaultLanguage != "" { + if v.youtubeInfo.Snippet.DefaultLanguage == "iw" { + v.youtubeInfo.Snippet.DefaultLanguage = "he" + } languages = []string{v.youtubeInfo.Snippet.DefaultLanguage} }