From f05522a30842018de8c8bc8ea9d394ff34373af4 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 21 Aug 2018 13:17:52 -0400 Subject: [PATCH] fix memory leak fix missing ptr dereference add single-run startup param stop counting channels that aren't actually considered in the sync process --- cmd/ytsync.go | 3 +++ ytsync/manager.go | 15 ++++++++++----- ytsync/ytsync.go | 3 ++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/cmd/ytsync.go b/cmd/ytsync.go index 35fa489..89c1bcd 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -23,6 +23,7 @@ var ( limit int skipSpaceCheck bool syncUpdate bool + singleRun bool syncStatus string channelID string syncFrom int64 @@ -45,6 +46,7 @@ func init() { ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") + ytSyncCmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") ytSyncCmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update") ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.") ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") @@ -165,6 +167,7 @@ func ytSync(cmd *cobra.Command, args []string) { AwsS3Secret: awsS3Secret, AwsS3Region: awsS3Region, AwsS3Bucket: awsS3Bucket, + SingleRun: singleRun, } err := sm.Start() diff --git a/ytsync/manager.go b/ytsync/manager.go index 0827344..cc0be28 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -7,10 +7,9 @@ import ( "net/http" "net/url" "strconv" - - "time" - + "strings" "syscall" + "time" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/null" @@ -44,6 +43,7 @@ type SyncManager struct { AwsS3Secret string AwsS3Region string AwsS3Bucket string + SingleRun bool } const ( @@ -270,6 +270,7 @@ func (s SyncManager) Start() error { time.Sleep(5 * time.Minute) } for i, sync := range syncs { + shouldNotCount := false SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) err := sync.FullCycle() if err != nil { @@ -278,21 +279,25 @@ func (s SyncManager) Start() error { "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR", "NotEnoughFunds", "no space left on device", + "failure uploading wallet", } if util.SubstringInSlice(err.Error(), fatalErrors) { return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err) } SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) + shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server") } SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) - syncCount++ + if !shouldNotCount { + syncCount++ + } if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { shouldInterruptLoop = true break } } - if shouldInterruptLoop { + if shouldInterruptLoop || s.SingleRun { break } } diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index e32f7c9..2df409d 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -230,6 +230,7 @@ func (s *Sync) FullCycle() (e error) { s.queue = make(chan video) interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(interruptChan) go func() { <-interruptChan log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") @@ -345,7 +346,7 @@ func (s *Sync) stopAndUploadWallet(e *error) { e = &err return } else { - *e = errors.Prefix("failure uploading wallet: ", e) + *e = errors.Prefix("failure uploading wallet: ", *e) } } }