From 14668c339e78f93352be77352f910ab61f76a65c Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 30 Aug 2019 21:08:28 +0200 Subject: [PATCH] refactor sync flags add disable-transfer flag --- main.go | 55 ++++++--------- manager/manager.go | 167 ++++++++++++++++++++------------------------- manager/setup.go | 2 +- manager/ytsync.go | 64 +++++++++-------- sdk/api.go | 11 +++ 5 files changed, 140 insertions(+), 159 deletions(-) diff --git a/main.go b/main.go index 51fc505..d09eea1 100644 --- a/main.go +++ b/main.go @@ -21,24 +21,18 @@ var Version string const defaultMaxTries = 3 var ( - stopOnError bool - maxTries int - takeOverExistingChannel bool - refill int - limit int - skipSpaceCheck bool - syncUpdate bool - singleRun bool - syncStatus string - channelID string - syncFrom int64 - syncUntil int64 - concurrentJobs int - videosLimit int - maxVideoSize int - maxVideoLength float64 - removeDBUnpublished bool - upgradeMetadata bool + flags sdk.SyncFlags + maxTries int + refill int + limit int + syncStatus string + channelID string + syncFrom int64 + syncUntil int64 + concurrentJobs int + videosLimit int + maxVideoSize int + maxVideoLength float64 ) func main() { @@ -52,15 +46,16 @@ func main() { Args: cobra.RangeArgs(0, 0), } - cmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit") + cmd.Flags().BoolVar(&flags.StopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit") cmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") - cmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") + cmd.Flags().BoolVar(&flags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") cmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") - cmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") - cmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") - cmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") - cmd.Flags().BoolVar(&removeDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") - cmd.Flags().BoolVar(&upgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") + cmd.Flags().BoolVar(&flags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") + cmd.Flags().BoolVar(&flags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") + cmd.Flags().BoolVar(&flags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") + cmd.Flags().BoolVar(&flags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") + cmd.Flags().BoolVar(&flags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") + cmd.Flags().BoolVar(&flags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports") cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update") cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.") cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") @@ -100,7 +95,7 @@ func ytSync(cmd *cobra.Command, args []string) { return } - if stopOnError && maxTries != defaultMaxTries { + if flags.StopOnError && maxTries != defaultMaxTries { log.Errorln("--stop-on-error and --max-tries are mutually exclusive") return } @@ -168,13 +163,10 @@ func ytSync(cmd *cobra.Command, args []string) { HostName: hostname, } sm := manager.NewSyncManager( - stopOnError, + flags, maxTries, - takeOverExistingChannel, refill, limit, - skipSpaceCheck, - syncUpdate, concurrentJobs, concurrentJobs, blobsDir, @@ -186,12 +178,9 @@ func ytSync(cmd *cobra.Command, args []string) { awsS3Region, awsS3Bucket, syncStatus, - singleRun, syncProperties, apiConfig, maxVideoLength, - removeDBUnpublished, - upgradeMetadata, ) err := sm.Start() if err != nil { diff --git a/manager/manager.go b/manager/manager.go index 7fac0db..c16bfff 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -20,61 +20,48 @@ import ( ) type SyncManager struct { - stopOnError bool - maxTries int - takeOverExistingChannel bool - refill int - limit int - skipSpaceCheck bool - syncUpdate bool - concurrentJobs int - concurrentVideos int - blobsDir string - videosLimit int - maxVideoSize int - maxVideoLength float64 - lbrycrdString string - awsS3ID string - awsS3Secret string - awsS3Region string - syncStatus string - awsS3Bucket string - singleRun bool - syncProperties *sdk.SyncProperties - apiConfig *sdk.APIConfig - removeDBUnpublished bool - upgradeMetadata bool + SyncFlags sdk.SyncFlags + maxTries int + refill int + limit int + concurrentJobs int + concurrentVideos int + blobsDir string + videosLimit int + maxVideoSize int + maxVideoLength float64 + lbrycrdString string + awsS3ID string + awsS3Secret string + awsS3Region string + syncStatus string + awsS3Bucket string + syncProperties *sdk.SyncProperties + apiConfig *sdk.APIConfig } -func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int, - skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int, +func NewSyncManager(syncFlags sdk.SyncFlags, maxTries int, refill int, limit int, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int, maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string, - syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64, removeDBUnpublished bool, upgradeMetadata bool) *SyncManager { + syncStatus string, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64) *SyncManager { return &SyncManager{ - stopOnError: stopOnError, - maxTries: maxTries, - takeOverExistingChannel: takeOverExistingChannel, - refill: refill, - limit: limit, - skipSpaceCheck: skipSpaceCheck, - syncUpdate: syncUpdate, - concurrentJobs: concurrentJobs, - concurrentVideos: concurrentVideos, - blobsDir: blobsDir, - videosLimit: videosLimit, - maxVideoSize: maxVideoSize, - maxVideoLength: maxVideoLength, - lbrycrdString: lbrycrdString, - awsS3ID: awsS3ID, - awsS3Secret: awsS3Secret, - awsS3Region: awsS3Region, - awsS3Bucket: awsS3Bucket, - syncStatus: syncStatus, - singleRun: singleRun, - syncProperties: syncProperties, - apiConfig: apiConfig, - removeDBUnpublished: removeDBUnpublished, - upgradeMetadata: upgradeMetadata, + SyncFlags: syncFlags, + maxTries: maxTries, + refill: refill, + limit: limit, + concurrentJobs: concurrentJobs, + concurrentVideos: concurrentVideos, + blobsDir: blobsDir, + videosLimit: videosLimit, + maxVideoSize: maxVideoSize, + maxVideoLength: maxVideoLength, + lbrycrdString: lbrycrdString, + awsS3ID: awsS3ID, + awsS3Secret: awsS3Secret, + awsS3Region: awsS3Region, + awsS3Bucket: awsS3Bucket, + syncStatus: syncStatus, + syncProperties: syncProperties, + apiConfig: apiConfig, } } @@ -139,25 +126,23 @@ func (s *SyncManager) Start() error { lbryChannelName := channels[0].DesiredChannelName syncs = make([]Sync, 1) syncs[0] = Sync{ - APIConfig: s.apiConfig, - YoutubeChannelID: s.syncProperties.YoutubeChannelID, - LbryChannelName: lbryChannelName, - lbryChannelID: channels[0].ChannelClaimID, - StopOnError: s.stopOnError, - MaxTries: s.maxTries, - ConcurrentVideos: s.concurrentVideos, - TakeOverExistingChannel: s.takeOverExistingChannel, - Refill: s.refill, - Manager: s, - LbrycrdString: s.lbrycrdString, - AwsS3ID: s.awsS3ID, - AwsS3Secret: s.awsS3Secret, - AwsS3Region: s.awsS3Region, - AwsS3Bucket: s.awsS3Bucket, - namer: namer.NewNamer(), - Fee: channels[0].Fee, - publishAddress: channels[0].PublishAddress, - transferState: channels[0].TransferState, + APIConfig: s.apiConfig, + YoutubeChannelID: s.syncProperties.YoutubeChannelID, + LbryChannelName: lbryChannelName, + lbryChannelID: channels[0].ChannelClaimID, + MaxTries: s.maxTries, + ConcurrentVideos: s.concurrentVideos, + Refill: s.refill, + Manager: s, + LbrycrdString: s.lbrycrdString, + AwsS3ID: s.awsS3ID, + AwsS3Secret: s.awsS3Secret, + AwsS3Region: s.awsS3Region, + AwsS3Bucket: s.awsS3Bucket, + namer: namer.NewNamer(), + Fee: channels[0].Fee, + publishAddress: channels[0].PublishAddress, + transferState: channels[0].TransferState, } shouldInterruptLoop = true } else { @@ -165,7 +150,7 @@ func (s *SyncManager) Start() error { //TODO: implement scrambling to avoid starvation of queues if s.syncStatus != "" { queuesToSync = append(queuesToSync, s.syncStatus) - } else if s.syncUpdate { + } else if s.SyncFlags.SyncUpdate { queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) } else { queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) @@ -183,25 +168,23 @@ func (s *SyncManager) Start() error { for i, c := range channels { log.Infof("There are %d channels in the \"%s\" queue", len(channels)-i, q) syncs = append(syncs, Sync{ - APIConfig: s.apiConfig, - YoutubeChannelID: c.ChannelId, - LbryChannelName: c.DesiredChannelName, - lbryChannelID: c.ChannelClaimID, - StopOnError: s.stopOnError, - MaxTries: s.maxTries, - ConcurrentVideos: s.concurrentVideos, - TakeOverExistingChannel: s.takeOverExistingChannel, - Refill: s.refill, - Manager: s, - LbrycrdString: s.lbrycrdString, - AwsS3ID: s.awsS3ID, - AwsS3Secret: s.awsS3Secret, - AwsS3Region: s.awsS3Region, - AwsS3Bucket: s.awsS3Bucket, - namer: namer.NewNamer(), - Fee: c.Fee, - publishAddress: c.PublishAddress, - transferState: c.TransferState, + APIConfig: s.apiConfig, + YoutubeChannelID: c.ChannelId, + LbryChannelName: c.DesiredChannelName, + lbryChannelID: c.ChannelClaimID, + MaxTries: s.maxTries, + ConcurrentVideos: s.concurrentVideos, + Refill: s.refill, + Manager: s, + LbrycrdString: s.lbrycrdString, + AwsS3ID: s.awsS3ID, + AwsS3Secret: s.awsS3Secret, + AwsS3Region: s.awsS3Region, + AwsS3Bucket: s.awsS3Bucket, + namer: namer.NewNamer(), + Fee: c.Fee, + publishAddress: c.PublishAddress, + transferState: c.TransferState, }) if q != StatusFailed { continue queues @@ -248,7 +231,7 @@ func (s *SyncManager) Start() error { break } } - if shouldInterruptLoop || s.singleRun { + if shouldInterruptLoop || s.SyncFlags.SingleRun { break } } @@ -265,7 +248,7 @@ func (s *SyncManager) checkUsedSpace() error { if err != nil { return errors.Err(err) } - if usedPctile >= 0.90 && !s.skipSpaceCheck { + if usedPctile >= 0.90 && !s.SyncFlags.SkipSpaceCheck { return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) } log.Infof("disk usage: %.1f%%", usedPctile*100) diff --git a/manager/setup.go b/manager/setup.go index cb9db4f..5a172ef 100644 --- a/manager/setup.go +++ b/manager/setup.go @@ -95,7 +95,7 @@ func (s *Sync) walletSetup() error { } unallocatedVideos := videosOnYoutube - (publishedCount + failedCount) requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelClaimAmount - if s.Manager.upgradeMetadata { + if s.Manager.SyncFlags.UpgradeMetadata { requiredBalance += float64(notUpgradedCount) * 0.001 } diff --git a/manager/ytsync.go b/manager/ytsync.go index 744ffe7..63a6279 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -63,33 +63,31 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[ // Sync stores the options that control how syncing happens type Sync struct { - APIConfig *sdk.APIConfig - YoutubeChannelID string - LbryChannelName string - StopOnError bool - MaxTries int - ConcurrentVideos int - TakeOverExistingChannel bool - Refill int - Manager *SyncManager - LbrycrdString string - AwsS3ID string - AwsS3Secret string - AwsS3Region string - AwsS3Bucket string - Fee *sdk.Fee - daemon *jsonrpc.Client - claimAddress string - videoDirectory string - syncedVideosMux *sync.RWMutex - syncedVideos map[string]sdk.SyncedVideo - grp *stop.Group - lbryChannelID string - namer *namer.Namer - walletMux *sync.RWMutex - queue chan video - transferState int - publishAddress string + APIConfig *sdk.APIConfig + YoutubeChannelID string + LbryChannelName string + MaxTries int + ConcurrentVideos int + Refill int + Manager *SyncManager + LbrycrdString string + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string + Fee *sdk.Fee + daemon *jsonrpc.Client + claimAddress string + videoDirectory string + syncedVideosMux *sync.RWMutex + syncedVideos map[string]sdk.SyncedVideo + grp *stop.Group + lbryChannelID string + namer *namer.Namer + walletMux *sync.RWMutex + queue chan video + transferState int + publishAddress string } func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) { @@ -364,7 +362,7 @@ func deleteSyncFolder(videoDirectory string) { } } func (s *Sync) shouldTransfer() bool { - return s.transferState == 1 && s.publishAddress != "" + return s.transferState == 1 && s.publishAddress != "" && !s.Manager.SyncFlags.DisableTransfers } func (s *Sync) setChannelTerminationStatus(e *error) { var transferState *int @@ -576,11 +574,11 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int } _, ok := videoIDMap[vID] if !ok && sv.Published { - log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.removeDBUnpublished) + log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.SyncFlags.RemoveDBUnpublished) idsToRemove = append(idsToRemove, vID) } } - if s.Manager.removeDBUnpublished && len(idsToRemove) > 0 { + if s.Manager.SyncFlags.RemoveDBUnpublished && len(idsToRemove) > 0 { log.Infof("removing: %s", strings.Join(idsToRemove, ",")) err := s.Manager.apiConfig.DeleteVideos(idsToRemove) if err != nil { @@ -687,7 +685,7 @@ func (s *Sync) doSync() error { } } - if s.StopOnError { + if s.Manager.SyncFlags.StopOnError { log.Println("Will stop publishing if an error is detected") } @@ -753,7 +751,7 @@ func (s *Sync) startWorker(workerNum int) { "more than 90% of the space has been used.", "Couldn't find private key for id", } - if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { + if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.SyncFlags.StopOnError { s.grp.Stop() } else if s.MaxTries > 1 { errorsNoRetry := []string{ @@ -979,7 +977,7 @@ func (s *Sync) processVideo(v video) (err error) { s.syncedVideosMux.RUnlock() newMetadataVersion := int8(2) alreadyPublished := ok && sv.Published - videoRequiresUpgrade := ok && s.Manager.upgradeMetadata && sv.MetadataVersion < newMetadataVersion + videoRequiresUpgrade := ok && s.Manager.SyncFlags.UpgradeMetadata && sv.MetadataVersion < newMetadataVersion neverRetryFailures := []string{ "Error extracting sts from embedded url response", diff --git a/sdk/api.go b/sdk/api.go index 06ad84a..356b312 100644 --- a/sdk/api.go +++ b/sdk/api.go @@ -34,6 +34,17 @@ type SyncProperties struct { YoutubeChannelID string } +type SyncFlags struct { + StopOnError bool + TakeOverExistingChannel bool + SkipSpaceCheck bool + SyncUpdate bool + SingleRun bool + RemoveDBUnpublished bool + UpgradeMetadata bool + DisableTransfers bool +} + type Fee struct { Amount string `json:"amount"` Address string `json:"address"`