fix memory leak

fix missing ptr dereference
add single-run startup param
stop counting channels that aren't actually considered in the sync process
This commit is contained in:
Niko Storni 2018-08-21 13:17:52 -04:00
parent b970af7b7d
commit f05522a308
No known key found for this signature in database
GPG key ID: F37FE63398800368
3 changed files with 15 additions and 6 deletions

View file

@ -23,6 +23,7 @@ var (
limit int
skipSpaceCheck bool
syncUpdate bool
singleRun bool
syncStatus string
channelID string
syncFrom int64
@ -45,6 +46,7 @@ func init() {
ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
ytSyncCmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
ytSyncCmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
@ -165,6 +167,7 @@ func ytSync(cmd *cobra.Command, args []string) {
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
SingleRun: singleRun,
}
err := sm.Start()

View file

@ -7,10 +7,9 @@ import (
"net/http"
"net/url"
"strconv"
"time"
"strings"
"syscall"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
@ -44,6 +43,7 @@ type SyncManager struct {
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
SingleRun bool
}
const (
@ -270,6 +270,7 @@ func (s SyncManager) Start() error {
time.Sleep(5 * time.Minute)
}
for i, sync := range syncs {
shouldNotCount := false
SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
err := sync.FullCycle()
if err != nil {
@ -278,21 +279,25 @@ func (s SyncManager) Start() error {
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
"NotEnoughFunds",
"no space left on device",
"failure uploading wallet",
}
if util.SubstringInSlice(err.Error(), fatalErrors) {
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
}
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server")
}
SendInfoToSlack("Syncing %s (%s) reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
syncCount++
if !shouldNotCount {
syncCount++
}
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true
break
}
}
if shouldInterruptLoop {
if shouldInterruptLoop || s.SingleRun {
break
}
}

View file

@ -230,6 +230,7 @@ func (s *Sync) FullCycle() (e error) {
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interruptChan)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
@ -345,7 +346,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
e = &err
return
} else {
*e = errors.Prefix("failure uploading wallet: ", e)
*e = errors.Prefix("failure uploading wallet: ", *e)
}
}
}