add IP debugging

improve process halting
rename improper language code for hebrew
This commit is contained in:
Niko Storni 2019-12-18 18:22:15 +01:00
parent 65c4f99c1c
commit 54d04dcf2c
4 changed files with 52 additions and 20 deletions

View file

@ -19,20 +19,22 @@ const unbanTimeout = 3 * time.Hour
var stopper = stop.New() var stopper = stop.New()
type IPPool struct { type IPPool struct {
ips []throttledIP ips []throttledIP
lock *sync.Mutex lock *sync.RWMutex
stopGrp *stop.Group
} }
type throttledIP struct { type throttledIP struct {
IP string IP string
LastUse time.Time UsedForVideo string
Throttled bool LastUse time.Time
InUse bool Throttled bool
InUse bool
} }
var ipPoolInstance *IPPool var ipPoolInstance *IPPool
func GetIPPool() (*IPPool, error) { func GetIPPool(stopGrp *stop.Group) (*IPPool, error) {
if ipPoolInstance != nil { if ipPoolInstance != nil {
return ipPoolInstance, nil return ipPoolInstance, nil
} }
@ -59,9 +61,25 @@ func GetIPPool() (*IPPool, error) {
} }
} }
ipPoolInstance = &IPPool{ ipPoolInstance = &IPPool{
ips: pool, ips: pool,
lock: &sync.Mutex{}, lock: &sync.RWMutex{},
stopGrp: stopGrp,
} }
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-stopGrp.Ch():
return
case <-ticker.C:
ipPoolInstance.lock.RLock()
for _, ip := range ipPoolInstance.ips {
log.Debugf("IP: %s\tInUse: %t\tVideoID: %s\tThrottled: %t\tLastUse: %.1f", ip.IP, ip.InUse, ip.UsedForVideo, ip.Throttled, time.Since(ip.LastUse).Seconds())
}
ipPoolInstance.lock.RUnlock()
}
}
}()
return ipPoolInstance, nil return ipPoolInstance, nil
} }
@ -102,7 +120,7 @@ func (i *IPPool) ReleaseIP(ip string) {
} }
} }
func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) { func (i *IPPool) SetThrottled(ip string) {
i.lock.Lock() i.lock.Lock()
defer i.lock.Unlock() defer i.lock.Unlock()
var tIP *throttledIP var tIP *throttledIP
@ -129,7 +147,7 @@ func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) {
tIP.Throttled = false tIP.Throttled = false
i.lock.Unlock() i.lock.Unlock()
util.SendInfoToSlack("%s set back to not throttled", ip) util.SendInfoToSlack("%s set back to not throttled", ip)
case <-stopGrp.Ch(): case <-i.stopGrp.Ch():
unbanTimer.Stop() unbanTimer.Stop()
} }
}(tIP) }(tIP)
@ -138,8 +156,9 @@ func (i *IPPool) SetThrottled(ip string, stopGrp *stop.Group) {
var ErrAllInUse = errors.Base("all IPs are in use, try again") var ErrAllInUse = errors.Base("all IPs are in use, try again")
var ErrAllThrottled = errors.Base("all IPs are throttled") var ErrAllThrottled = errors.Base("all IPs are throttled")
var ErrResourceLock = errors.Base("error getting next ip, did you forget to lock on the resource?") var ErrResourceLock = errors.Base("error getting next ip, did you forget to lock on the resource?")
var ErrInterruptedByUser = errors.Base("interrupted by user")
func (i *IPPool) nextIP() (*throttledIP, error) { func (i *IPPool) nextIP(forVideo string) (*throttledIP, error) {
i.lock.Lock() i.lock.Lock()
defer i.lock.Unlock() defer i.lock.Unlock()
@ -165,18 +184,24 @@ func (i *IPPool) nextIP() (*throttledIP, error) {
return nil, errors.Err(ErrResourceLock) return nil, errors.Err(ErrResourceLock)
} }
nextIP.InUse = true nextIP.InUse = true
nextIP.UsedForVideo = forVideo
return nextIP, nil return nextIP, nil
} }
return nil, errors.Err(ErrAllThrottled) return nil, errors.Err(ErrAllThrottled)
} }
func (i *IPPool) GetIP() (string, error) { func (i *IPPool) GetIP(forVideo string) (string, error) {
for { for {
ip, err := i.nextIP() ip, err := i.nextIP(forVideo)
if err != nil { if err != nil {
if errors.Is(err, ErrAllInUse) { if errors.Is(err, ErrAllInUse) {
time.Sleep(5 * time.Second) select {
continue case <-i.stopGrp.Ch():
return "", errors.Err(ErrInterruptedByUser)
default:
time.Sleep(5 * time.Second)
continue
}
} else if errors.Is(err, ErrAllThrottled) { } else if errors.Is(err, ErrAllThrottled) {
return "throttled", err return "throttled", err
} }

View file

@ -399,6 +399,9 @@ func (s *Sync) ensureChannelOwnership() error {
var languages []string = nil var languages []string = nil
if channelInfo.DefaultLanguage != "" { if channelInfo.DefaultLanguage != "" {
if channelInfo.DefaultLanguage == "iw" {
channelInfo.DefaultLanguage = "he"
}
languages = []string{channelInfo.DefaultLanguage} languages = []string{channelInfo.DefaultLanguage}
} }
var locations []jsonrpc.Location = nil var locations []jsonrpc.Location = nil

View file

@ -873,6 +873,7 @@ func (s *Sync) startWorker(workerNum int) {
"Sorry about that", "Sorry about that",
"This video is not available", "This video is not available",
"requested format not available", "requested format not available",
"interrupted by user",
} }
if util.SubstringInSlice(err.Error(), errorsNoRetry) { if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all") log.Println("This error should not be retried at all")
@ -975,7 +976,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
} }
var videos []video var videos []video
ipPool, err := ip_manager.GetIPPool() ipPool, err := ip_manager.GetIPPool(s.grp)
if err != nil { if err != nil {
return err return err
} }

View file

@ -224,7 +224,7 @@ func (v *YoutubeVideo) download() error {
) )
} }
sourceAddress, err := v.pool.GetIP() sourceAddress, err := v.pool.GetIP(v.id)
if err != nil { if err != nil {
if sourceAddress == "throttled" { if sourceAddress == "throttled" {
for { for {
@ -235,7 +235,7 @@ func (v *YoutubeVideo) download() error {
} }
time.Sleep(ip_manager.IPCooldownPeriod) time.Sleep(ip_manager.IPCooldownPeriod)
sourceAddress, err = v.pool.GetIP() sourceAddress, err = v.pool.GetIP(v.id)
if err == nil { if err == nil {
break break
} else if !errors.Is(err, ip_manager.ErrAllThrottled) { } else if !errors.Is(err, ip_manager.ErrAllThrottled) {
@ -278,7 +278,7 @@ runcmd:
if err = cmd.Wait(); err != nil { if err = cmd.Wait(); err != nil {
if strings.Contains(err.Error(), "exit status 1") { if strings.Contains(err.Error(), "exit status 1") {
if strings.Contains(string(errorLog), "HTTP Error 429") || strings.Contains(string(errorLog), "returned non-zero exit status 8") { if strings.Contains(string(errorLog), "HTTP Error 429") || strings.Contains(string(errorLog), "returned non-zero exit status 8") {
v.pool.SetThrottled(sourceAddress, v.stopGroup) v.pool.SetThrottled(sourceAddress)
} else if strings.Contains(string(errorLog), "giving up after 0 fragment retries") && qualityIndex < len(qualities)-1 { } else if strings.Contains(string(errorLog), "giving up after 0 fragment retries") && qualityIndex < len(qualities)-1 {
qualityIndex++ qualityIndex++
goto runcmd //this bypasses the yt throttling IP redistribution... TODO: don't goto runcmd //this bypasses the yt throttling IP redistribution... TODO: don't
@ -459,6 +459,9 @@ func (v *YoutubeVideo) getMetadata() (languages []string, locations []jsonrpc.Lo
tags = nil tags = nil
if !v.mocked { if !v.mocked {
if v.youtubeInfo.Snippet.DefaultLanguage != "" { if v.youtubeInfo.Snippet.DefaultLanguage != "" {
if v.youtubeInfo.Snippet.DefaultLanguage == "iw" {
v.youtubeInfo.Snippet.DefaultLanguage = "he"
}
languages = []string{v.youtubeInfo.Snippet.DefaultLanguage} languages = []string{v.youtubeInfo.Snippet.DefaultLanguage}
} }