From 60f2585f330e0cdf4353a433d3515d8491f3ea91 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Mon, 15 Jul 2019 16:16:02 -0400 Subject: [PATCH] improve throttling refactor slack wrapper cleanup dependencies --- go.mod | 2 -- go.sum | 7 ------ ipManager/throttle.go | 11 +++++---- main.go | 7 +++--- manager/manager.go | 8 +++---- manager/setup.go | 3 ++- manager/ytsync.go | 52 +++++++++++-------------------------------- util/log_wrapper.go | 34 ++++++++++++++++++++++++++++ 8 files changed, 64 insertions(+), 60 deletions(-) create mode 100644 util/log_wrapper.go diff --git a/go.mod b/go.mod index 8bf2a90..6a7829b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/lbryio/ytsync require ( cloud.google.com/go v0.37.4 // indirect github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61 - github.com/PuerkitoBio/goquery v1.5.0 // indirect github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf github.com/aws/aws-sdk-go v1.17.3 @@ -22,7 +21,6 @@ require ( github.com/lbryio/reflector.go v1.0.6-0.20190710201919-7c7ed9da72ce github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936 - github.com/nikooo777/ytdl v0.0.0-20190215151411-9c7832eaf457 github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect diff --git a/go.sum b/go.sum index ecce78c..9df757d 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61 h1:N5Vqww5QISEHsWHOWDEx4PzdIay3Cg0Jp7zItq2ZAro= github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61/go.mod h1:GnKXcK+7DYNy/8w2Ex//Uql4IgfaU82Cd5rWKb7ah00= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/PuerkitoBio/goquery v1.5.0 h1:uGvmFXOA73IKluu/F84Xd1tt/z07GYm8X49XKHP7EJk= -github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -15,8 +13,6 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5Vpd github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o= -github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180713145231-3c58d8115a78 h1:mdRSArcFLfW0VoL34LZAKSz6LkkK4jFxVx2xYavACMg= @@ -228,8 +224,6 @@ github.com/mitchellh/mapstructure v0.0.0-20180511142126-bb74f1db0675/go.mod h1:F github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nikooo777/ytdl v0.0.0-20190215151411-9c7832eaf457 h1:S6VzSeZkOv8pKevvKF1isVp9HF294LxVsYw5JrV0FdQ= -github.com/nikooo777/ytdl v0.0.0-20190215151411-9c7832eaf457/go.mod h1:Z/EmaCtMz3DxZsLaTnbHjqjZrOH4MHQBvvxtMObRGAY= github.com/nlopes/slack v0.2.0/go.mod h1:jVI4BBK3lSktibKahxBF74txcK2vyvkza1z/+rRnVAM= github.com/nlopes/slack v0.4.0/go.mod h1:jVI4BBK3lSktibKahxBF74txcK2vyvkza1z/+rRnVAM= github.com/nlopes/slack v0.5.0 h1:NbIae8Kd0NpqaEI3iUrsuS0KbcEDhzhc939jLW5fNm0= @@ -331,7 +325,6 @@ golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/ipManager/throttle.go b/ipManager/throttle.go index 3d3ccc2..acf038c 100644 --- a/ipManager/throttle.go +++ b/ipManager/throttle.go @@ -2,6 +2,7 @@ package ipManager import ( "github.com/asaskevich/govalidator" + "github.com/lbryio/ytsync/util" log "github.com/sirupsen/logrus" "github.com/lbryio/lbry.go/extras/errors" @@ -12,7 +13,7 @@ import ( "time" ) -const IPCooldownPeriod = 20 * time.Second +const IPCooldownPeriod = 25 * time.Second const unbanTimeout = 3 * time.Hour var ipv6Pool []string @@ -96,13 +97,13 @@ func getLeastUsedIP(ipPool []string) string { func SetIpThrottled(ip string, stopGrp *stop.Group) { ipMutex.Lock() - defer ipMutex.Unlock() isThrottled := throttledIPs[ip] if isThrottled { return } throttledIPs[ip] = true - log.Printf("%s set to throttled", ip) + ipMutex.Unlock() + util.SendErrorToSlack("%s set to throttled", ip) stopper.Add(1) go func() { @@ -110,8 +111,10 @@ func SetIpThrottled(ip string, stopGrp *stop.Group) { unbanTimer := time.NewTimer(unbanTimeout) select { case <-unbanTimer.C: + ipMutex.Lock() throttledIPs[ip] = false - log.Printf("%s set back to not throttled", ip) + ipMutex.Unlock() + util.SendInfoToSlack("%s set back to not throttled", ip) case <-stopGrp.Ch(): unbanTimer.Stop() } diff --git a/main.go b/main.go index 4b7b132..dfc5019 100644 --- a/main.go +++ b/main.go @@ -9,10 +9,11 @@ import ( "github.com/lbryio/lbry.go/extras/util" "github.com/lbryio/ytsync/sdk" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/lbryio/ytsync/manager" - log "github.com/sirupsen/logrus" + logUtils "github.com/lbryio/ytsync/util" ) var Version string @@ -197,7 +198,7 @@ func ytSync(cmd *cobra.Command, args []string) { ) err := sm.Start() if err != nil { - manager.SendErrorToSlack(err.Error()) + logUtils.SendErrorToSlack(err.Error()) } - manager.SendInfoToSlack("Syncing process terminated!") + logUtils.SendInfoToSlack("Syncing process terminated!") } diff --git a/manager/manager.go b/manager/manager.go index 8e41d62..6a74bc7 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -9,6 +9,7 @@ import ( "github.com/lbryio/ytsync/blobs_reflector" "github.com/lbryio/ytsync/namer" "github.com/lbryio/ytsync/sdk" + logUtils "github.com/lbryio/ytsync/util" "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/util" @@ -188,7 +189,7 @@ func (s *SyncManager) Start() error { } for _, sync := range syncs { shouldNotCount := false - SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) + logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) err := sync.FullCycle() if err != nil { fatalErrors := []string{ @@ -199,21 +200,20 @@ func (s *SyncManager) Start() error { "failure uploading wallet", "the channel in the wallet is different than the channel in the database", "this channel does not belong to this wallet!", - "HTTP Error 429", } if util.SubstringInSlice(err.Error(), fatalErrors) { return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err) } shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server") if !shouldNotCount { - SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) + logUtils.SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) } } err = blobs_reflector.ReflectAndClean() if err != nil { return errors.Prefix("@Nikooo777 something went wrong while reflecting blobs", err) } - SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) + logUtils.SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) if !shouldNotCount { syncCount++ } diff --git a/manager/setup.go b/manager/setup.go index b9d264c..5cf5c01 100644 --- a/manager/setup.go +++ b/manager/setup.go @@ -15,6 +15,7 @@ import ( "github.com/lbryio/ytsync/tagsManager" "github.com/lbryio/ytsync/thumbs" + logUtils "github.com/lbryio/ytsync/util" "github.com/shopspring/decimal" log "github.com/sirupsen/logrus" @@ -267,7 +268,7 @@ func (s *Sync) ensureChannelOwnership() error { if len((*channels).Items) > 1 { // This wallet is probably not under our control anymore but we still want to publish to it // here we shall check if within all the channels there is one that was created by ytsync - SendInfoToSlack("we are dealing with a wallet that has multiple channels. This indicates that the wallet was probably transferred but we still want to sync their content. YoutubeID: %s", s.YoutubeChannelID) + logUtils.SendInfoToSlack("we are dealing with a wallet that has multiple channels. This indicates that the wallet was probably transferred but we still want to sync their content. YoutubeID: %s", s.YoutubeChannelID) if s.lbryChannelID == "" { return errors.Err("this channel does not have a recorded claimID in the database. To prevent failures, updates are not supported until an entry is manually added in the database") } diff --git a/manager/ytsync.go b/manager/ytsync.go index 1a5ed45..612ebc3 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -18,6 +18,7 @@ import ( "github.com/lbryio/ytsync/sdk" "github.com/lbryio/ytsync/sources" "github.com/lbryio/ytsync/thumbs" + logUtils "github.com/lbryio/ytsync/util" "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/jsonrpc" @@ -104,32 +105,6 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s } } -// SendErrorToSlack Sends an error message to the default channel and to the process log. -func SendErrorToSlack(format string, a ...interface{}) { - message := format - if len(a) > 0 { - message = fmt.Sprintf(format, a...) - } - log.Errorln(message) - err := util.SendToSlack(":sos: " + message) - if err != nil { - log.Errorln(err) - } -} - -// SendInfoToSlack Sends an info message to the default channel and to the process log. -func SendInfoToSlack(format string, a ...interface{}) { - message := format - if len(a) > 0 { - message = fmt.Sprintf(format, a...) - } - log.Infoln(message) - err := util.SendToSlack(":information_source: " + message) - if err != nil { - log.Errorln(err) - } -} - // IsInterrupted can be queried to discover if the sync process was interrupted manually func (s *Sync) IsInterrupted() bool { select { @@ -389,8 +364,8 @@ func (s *Sync) stopAndUploadWallet(e *error) { } } func logShutdownError(shutdownErr error) { - SendErrorToSlack("error shutting down daemon: %v", shutdownErr) - SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") + logUtils.SendErrorToSlack("error shutting down daemon: %v", shutdownErr) + logUtils.SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") } var thumbnailHosts = []string{ @@ -553,7 +528,7 @@ func (s *Sync) doSync() error { return errors.Prefix("error checking for duplicates", err) } if hasDupes { - SendInfoToSlack("Channel had dupes and was fixed!") + logUtils.SendInfoToSlack("Channel had dupes and was fixed!") err = s.waitForNewBlock() if err != nil { return err @@ -574,10 +549,10 @@ func (s *Sync) doSync() error { return err } if nFixed > 0 { - SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed) + logUtils.SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed) } if nRemoved > 0 { - SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved) + logUtils.SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved) } } pubsOnDB := 0 @@ -588,11 +563,11 @@ func (s *Sync) doSync() error { } if pubsOnWallet > pubsOnDB { //This case should never happen - SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) + logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) return errors.Err("not all published videos are in the database") } if pubsOnWallet < pubsOnDB { - SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) + logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) } if s.StopOnError { @@ -659,7 +634,6 @@ func (s *Sync) startWorker(workerNum int) { "Cannot publish using channel", "cannot concatenate 'str' and 'NoneType' objects", "more than 90% of the space has been used.", - "HTTP Error 429", "Couldn't find private key for id", } if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { @@ -693,7 +667,7 @@ func (s *Sync) startWorker(workerNum int) { err := s.waitForNewBlock() if err != nil { s.grp.Stop() - SendErrorToSlack("something went wrong while waiting for a block: %v", err) + logUtils.SendErrorToSlack("something went wrong while waiting for a block: %v", err) break } } else if util.SubstringInSlice(err.Error(), []string{ @@ -705,7 +679,7 @@ func (s *Sync) startWorker(workerNum int) { err := s.walletSetup() if err != nil { s.grp.Stop() - SendErrorToSlack("failed to setup the wallet for a refill: %v", err) + logUtils.SendErrorToSlack("failed to setup the wallet for a refill: %v", err) break } } else if strings.Contains(err.Error(), "Error in daemon: 'str' object has no attribute 'get'") { @@ -714,7 +688,7 @@ func (s *Sync) startWorker(workerNum int) { log.Println("Retrying") continue } - SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) + logUtils.SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } s.syncedVideosMux.RLock() existingClaim, ok := s.syncedVideos[v.ID()] @@ -740,7 +714,7 @@ func (s *Sync) startWorker(workerNum int) { } err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), videoStatus, existingClaimID, existingClaimName, err.Error(), &existingClaimSize, 0) if err != nil { - SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) + logUtils.SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } } break @@ -930,7 +904,7 @@ func (s *Sync) processVideo(v video) (err error) { s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName, summary.ClaimID, newMetadataVersion, *v.Size()) err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size(), 2) if err != nil { - SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) + logUtils.SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } return nil diff --git a/util/log_wrapper.go b/util/log_wrapper.go new file mode 100644 index 0000000..7dab2c8 --- /dev/null +++ b/util/log_wrapper.go @@ -0,0 +1,34 @@ +package util + +import ( + "fmt" + + "github.com/lbryio/lbry.go/extras/util" + log "github.com/sirupsen/logrus" +) + +// SendErrorToSlack Sends an error message to the default channel and to the process log. +func SendErrorToSlack(format string, a ...interface{}) { + message := format + if len(a) > 0 { + message = fmt.Sprintf(format, a...) + } + log.Errorln(message) + err := util.SendToSlack(":sos: " + message) + if err != nil { + log.Errorln(err) + } +} + +// SendInfoToSlack Sends an info message to the default channel and to the process log. +func SendInfoToSlack(format string, a ...interface{}) { + message := format + if len(a) > 0 { + message = fmt.Sprintf(format, a...) + } + log.Infoln(message) + err := util.SendToSlack(":information_source: " + message) + if err != nil { + log.Errorln(err) + } +}