improve throttling

refactor slack wrapper
cleanup dependencies
This commit is contained in:
Niko Storni 2019-07-15 16:16:02 -04:00
parent df08d42d9b
commit 60f2585f33
8 changed files with 64 additions and 60 deletions

2
go.mod
View file

@ -3,7 +3,6 @@ module github.com/lbryio/ytsync
require (
cloud.google.com/go v0.37.4 // indirect
github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61
github.com/PuerkitoBio/goquery v1.5.0 // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf
github.com/aws/aws-sdk-go v1.17.3
@ -22,7 +21,6 @@ require (
github.com/lbryio/reflector.go v1.0.6-0.20190710201919-7c7ed9da72ce
github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
github.com/nikooo777/ytdl v0.0.0-20190215151411-9c7832eaf457
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect

7
go.sum
View file

@ -6,8 +6,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61 h1:N5Vqww5QISEHsWHOWDEx4PzdIay3Cg0Jp7zItq2ZAro=
github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61/go.mod h1:GnKXcK+7DYNy/8w2Ex//Uql4IgfaU82Cd5rWKb7ah00=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/PuerkitoBio/goquery v1.5.0 h1:uGvmFXOA73IKluu/F84Xd1tt/z07GYm8X49XKHP7EJk=
github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
@ -15,8 +13,6 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5Vpd
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o=
github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180713145231-3c58d8115a78 h1:mdRSArcFLfW0VoL34LZAKSz6LkkK4jFxVx2xYavACMg=
@ -228,8 +224,6 @@ github.com/mitchellh/mapstructure v0.0.0-20180511142126-bb74f1db0675/go.mod h1:F
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nikooo777/ytdl v0.0.0-20190215151411-9c7832eaf457 h1:S6VzSeZkOv8pKevvKF1isVp9HF294LxVsYw5JrV0FdQ=
github.com/nikooo777/ytdl v0.0.0-20190215151411-9c7832eaf457/go.mod h1:Z/EmaCtMz3DxZsLaTnbHjqjZrOH4MHQBvvxtMObRGAY=
github.com/nlopes/slack v0.2.0/go.mod h1:jVI4BBK3lSktibKahxBF74txcK2vyvkza1z/+rRnVAM=
github.com/nlopes/slack v0.4.0/go.mod h1:jVI4BBK3lSktibKahxBF74txcK2vyvkza1z/+rRnVAM=
github.com/nlopes/slack v0.5.0 h1:NbIae8Kd0NpqaEI3iUrsuS0KbcEDhzhc939jLW5fNm0=
@ -331,7 +325,6 @@ golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View file

@ -2,6 +2,7 @@ package ipManager
import (
"github.com/asaskevich/govalidator"
"github.com/lbryio/ytsync/util"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/extras/errors"
@ -12,7 +13,7 @@ import (
"time"
)
const IPCooldownPeriod = 20 * time.Second
const IPCooldownPeriod = 25 * time.Second
const unbanTimeout = 3 * time.Hour
var ipv6Pool []string
@ -96,13 +97,13 @@ func getLeastUsedIP(ipPool []string) string {
func SetIpThrottled(ip string, stopGrp *stop.Group) {
ipMutex.Lock()
defer ipMutex.Unlock()
isThrottled := throttledIPs[ip]
if isThrottled {
return
}
throttledIPs[ip] = true
log.Printf("%s set to throttled", ip)
ipMutex.Unlock()
util.SendErrorToSlack("%s set to throttled", ip)
stopper.Add(1)
go func() {
@ -110,8 +111,10 @@ func SetIpThrottled(ip string, stopGrp *stop.Group) {
unbanTimer := time.NewTimer(unbanTimeout)
select {
case <-unbanTimer.C:
ipMutex.Lock()
throttledIPs[ip] = false
log.Printf("%s set back to not throttled", ip)
ipMutex.Unlock()
util.SendInfoToSlack("%s set back to not throttled", ip)
case <-stopGrp.Ch():
unbanTimer.Stop()
}

View file

@ -9,10 +9,11 @@ import (
"github.com/lbryio/lbry.go/extras/util"
"github.com/lbryio/ytsync/sdk"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/lbryio/ytsync/manager"
log "github.com/sirupsen/logrus"
logUtils "github.com/lbryio/ytsync/util"
)
var Version string
@ -197,7 +198,7 @@ func ytSync(cmd *cobra.Command, args []string) {
)
err := sm.Start()
if err != nil {
manager.SendErrorToSlack(err.Error())
logUtils.SendErrorToSlack(err.Error())
}
manager.SendInfoToSlack("Syncing process terminated!")
logUtils.SendInfoToSlack("Syncing process terminated!")
}

View file

@ -9,6 +9,7 @@ import (
"github.com/lbryio/ytsync/blobs_reflector"
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
logUtils "github.com/lbryio/ytsync/util"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/util"
@ -188,7 +189,7 @@ func (s *SyncManager) Start() error {
}
for _, sync := range syncs {
shouldNotCount := false
SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
err := sync.FullCycle()
if err != nil {
fatalErrors := []string{
@ -199,21 +200,20 @@ func (s *SyncManager) Start() error {
"failure uploading wallet",
"the channel in the wallet is different than the channel in the database",
"this channel does not belong to this wallet!",
"HTTP Error 429",
}
if util.SubstringInSlice(err.Error(), fatalErrors) {
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
}
shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server")
if !shouldNotCount {
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
logUtils.SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
}
}
err = blobs_reflector.ReflectAndClean()
if err != nil {
return errors.Prefix("@Nikooo777 something went wrong while reflecting blobs", err)
}
SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
logUtils.SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
if !shouldNotCount {
syncCount++
}

View file

@ -15,6 +15,7 @@ import (
"github.com/lbryio/ytsync/tagsManager"
"github.com/lbryio/ytsync/thumbs"
logUtils "github.com/lbryio/ytsync/util"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
@ -267,7 +268,7 @@ func (s *Sync) ensureChannelOwnership() error {
if len((*channels).Items) > 1 {
// This wallet is probably not under our control anymore but we still want to publish to it
// here we shall check if within all the channels there is one that was created by ytsync
SendInfoToSlack("we are dealing with a wallet that has multiple channels. This indicates that the wallet was probably transferred but we still want to sync their content. YoutubeID: %s", s.YoutubeChannelID)
logUtils.SendInfoToSlack("we are dealing with a wallet that has multiple channels. This indicates that the wallet was probably transferred but we still want to sync their content. YoutubeID: %s", s.YoutubeChannelID)
if s.lbryChannelID == "" {
return errors.Err("this channel does not have a recorded claimID in the database. To prevent failures, updates are not supported until an entry is manually added in the database")
}

View file

@ -18,6 +18,7 @@ import (
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources"
"github.com/lbryio/ytsync/thumbs"
logUtils "github.com/lbryio/ytsync/util"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
@ -104,32 +105,6 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
}
}
// SendErrorToSlack Sends an error message to the default channel and to the process log.
func SendErrorToSlack(format string, a ...interface{}) {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
err := util.SendToSlack(":sos: " + message)
if err != nil {
log.Errorln(err)
}
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack(format string, a ...interface{}) {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
err := util.SendToSlack(":information_source: " + message)
if err != nil {
log.Errorln(err)
}
}
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func (s *Sync) IsInterrupted() bool {
select {
@ -389,8 +364,8 @@ func (s *Sync) stopAndUploadWallet(e *error) {
}
}
func logShutdownError(shutdownErr error) {
SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
logUtils.SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
logUtils.SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
var thumbnailHosts = []string{
@ -553,7 +528,7 @@ func (s *Sync) doSync() error {
return errors.Prefix("error checking for duplicates", err)
}
if hasDupes {
SendInfoToSlack("Channel had dupes and was fixed!")
logUtils.SendInfoToSlack("Channel had dupes and was fixed!")
err = s.waitForNewBlock()
if err != nil {
return err
@ -574,10 +549,10 @@ func (s *Sync) doSync() error {
return err
}
if nFixed > 0 {
SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed)
logUtils.SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed)
}
if nRemoved > 0 {
SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved)
logUtils.SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved)
}
}
pubsOnDB := 0
@ -588,11 +563,11 @@ func (s *Sync) doSync() error {
}
if pubsOnWallet > pubsOnDB { //This case should never happen
SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
if s.StopOnError {
@ -659,7 +634,6 @@ func (s *Sync) startWorker(workerNum int) {
"Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.",
"HTTP Error 429",
"Couldn't find private key for id",
}
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
@ -693,7 +667,7 @@ func (s *Sync) startWorker(workerNum int) {
err := s.waitForNewBlock()
if err != nil {
s.grp.Stop()
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
logUtils.SendErrorToSlack("something went wrong while waiting for a block: %v", err)
break
}
} else if util.SubstringInSlice(err.Error(), []string{
@ -705,7 +679,7 @@ func (s *Sync) startWorker(workerNum int) {
err := s.walletSetup()
if err != nil {
s.grp.Stop()
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
logUtils.SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break
}
} else if strings.Contains(err.Error(), "Error in daemon: 'str' object has no attribute 'get'") {
@ -714,7 +688,7 @@ func (s *Sync) startWorker(workerNum int) {
log.Println("Retrying")
continue
}
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
logUtils.SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.syncedVideosMux.RLock()
existingClaim, ok := s.syncedVideos[v.ID()]
@ -740,7 +714,7 @@ func (s *Sync) startWorker(workerNum int) {
}
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), videoStatus, existingClaimID, existingClaimName, err.Error(), &existingClaimSize, 0)
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
logUtils.SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
}
break
@ -930,7 +904,7 @@ func (s *Sync) processVideo(v video) (err error) {
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName, summary.ClaimID, newMetadataVersion, *v.Size())
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size(), 2)
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
logUtils.SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
return nil

34
util/log_wrapper.go Normal file
View file

@ -0,0 +1,34 @@
package util
import (
"fmt"
"github.com/lbryio/lbry.go/extras/util"
log "github.com/sirupsen/logrus"
)
// SendErrorToSlack Sends an error message to the default channel and to the process log.
func SendErrorToSlack(format string, a ...interface{}) {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
err := util.SendToSlack(":sos: " + message)
if err != nil {
log.Errorln(err)
}
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack(format string, a ...interface{}) {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
err := util.SendToSlack(":information_source: " + message)
if err != nil {
log.Errorln(err)
}
}