refactor sync flags

add disable-transfer flag
This commit is contained in:
Niko Storni 2019-08-30 21:08:28 +02:00
parent 7d38aa7b29
commit 14668c339e
5 changed files with 140 additions and 159 deletions

55
main.go
View file

@ -21,24 +21,18 @@ var Version string
const defaultMaxTries = 3 const defaultMaxTries = 3
var ( var (
stopOnError bool flags sdk.SyncFlags
maxTries int maxTries int
takeOverExistingChannel bool refill int
refill int limit int
limit int syncStatus string
skipSpaceCheck bool channelID string
syncUpdate bool syncFrom int64
singleRun bool syncUntil int64
syncStatus string concurrentJobs int
channelID string videosLimit int
syncFrom int64 maxVideoSize int
syncUntil int64 maxVideoLength float64
concurrentJobs int
videosLimit int
maxVideoSize int
maxVideoLength float64
removeDBUnpublished bool
upgradeMetadata bool
) )
func main() { func main() {
@ -52,15 +46,16 @@ func main() {
Args: cobra.RangeArgs(0, 0), Args: cobra.RangeArgs(0, 0),
} }
cmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit") cmd.Flags().BoolVar(&flags.StopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
cmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") cmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
cmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") cmd.Flags().BoolVar(&flags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
cmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") cmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
cmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") cmd.Flags().BoolVar(&flags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") cmd.Flags().BoolVar(&flags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") cmd.Flags().BoolVar(&flags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&removeDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") cmd.Flags().BoolVar(&flags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().BoolVar(&upgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") cmd.Flags().BoolVar(&flags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().BoolVar(&flags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports")
cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update") cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.") cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
@ -100,7 +95,7 @@ func ytSync(cmd *cobra.Command, args []string) {
return return
} }
if stopOnError && maxTries != defaultMaxTries { if flags.StopOnError && maxTries != defaultMaxTries {
log.Errorln("--stop-on-error and --max-tries are mutually exclusive") log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
return return
} }
@ -168,13 +163,10 @@ func ytSync(cmd *cobra.Command, args []string) {
HostName: hostname, HostName: hostname,
} }
sm := manager.NewSyncManager( sm := manager.NewSyncManager(
stopOnError, flags,
maxTries, maxTries,
takeOverExistingChannel,
refill, refill,
limit, limit,
skipSpaceCheck,
syncUpdate,
concurrentJobs, concurrentJobs,
concurrentJobs, concurrentJobs,
blobsDir, blobsDir,
@ -186,12 +178,9 @@ func ytSync(cmd *cobra.Command, args []string) {
awsS3Region, awsS3Region,
awsS3Bucket, awsS3Bucket,
syncStatus, syncStatus,
singleRun,
syncProperties, syncProperties,
apiConfig, apiConfig,
maxVideoLength, maxVideoLength,
removeDBUnpublished,
upgradeMetadata,
) )
err := sm.Start() err := sm.Start()
if err != nil { if err != nil {

View file

@ -20,61 +20,48 @@ import (
) )
type SyncManager struct { type SyncManager struct {
stopOnError bool SyncFlags sdk.SyncFlags
maxTries int maxTries int
takeOverExistingChannel bool refill int
refill int limit int
limit int concurrentJobs int
skipSpaceCheck bool concurrentVideos int
syncUpdate bool blobsDir string
concurrentJobs int videosLimit int
concurrentVideos int maxVideoSize int
blobsDir string maxVideoLength float64
videosLimit int lbrycrdString string
maxVideoSize int awsS3ID string
maxVideoLength float64 awsS3Secret string
lbrycrdString string awsS3Region string
awsS3ID string syncStatus string
awsS3Secret string awsS3Bucket string
awsS3Region string syncProperties *sdk.SyncProperties
syncStatus string apiConfig *sdk.APIConfig
awsS3Bucket string
singleRun bool
syncProperties *sdk.SyncProperties
apiConfig *sdk.APIConfig
removeDBUnpublished bool
upgradeMetadata bool
} }
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int, func NewSyncManager(syncFlags sdk.SyncFlags, maxTries int, refill int, limit int, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string, maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64, removeDBUnpublished bool, upgradeMetadata bool) *SyncManager { syncStatus string, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64) *SyncManager {
return &SyncManager{ return &SyncManager{
stopOnError: stopOnError, SyncFlags: syncFlags,
maxTries: maxTries, maxTries: maxTries,
takeOverExistingChannel: takeOverExistingChannel, refill: refill,
refill: refill, limit: limit,
limit: limit, concurrentJobs: concurrentJobs,
skipSpaceCheck: skipSpaceCheck, concurrentVideos: concurrentVideos,
syncUpdate: syncUpdate, blobsDir: blobsDir,
concurrentJobs: concurrentJobs, videosLimit: videosLimit,
concurrentVideos: concurrentVideos, maxVideoSize: maxVideoSize,
blobsDir: blobsDir, maxVideoLength: maxVideoLength,
videosLimit: videosLimit, lbrycrdString: lbrycrdString,
maxVideoSize: maxVideoSize, awsS3ID: awsS3ID,
maxVideoLength: maxVideoLength, awsS3Secret: awsS3Secret,
lbrycrdString: lbrycrdString, awsS3Region: awsS3Region,
awsS3ID: awsS3ID, awsS3Bucket: awsS3Bucket,
awsS3Secret: awsS3Secret, syncStatus: syncStatus,
awsS3Region: awsS3Region, syncProperties: syncProperties,
awsS3Bucket: awsS3Bucket, apiConfig: apiConfig,
syncStatus: syncStatus,
singleRun: singleRun,
syncProperties: syncProperties,
apiConfig: apiConfig,
removeDBUnpublished: removeDBUnpublished,
upgradeMetadata: upgradeMetadata,
} }
} }
@ -139,25 +126,23 @@ func (s *SyncManager) Start() error {
lbryChannelName := channels[0].DesiredChannelName lbryChannelName := channels[0].DesiredChannelName
syncs = make([]Sync, 1) syncs = make([]Sync, 1)
syncs[0] = Sync{ syncs[0] = Sync{
APIConfig: s.apiConfig, APIConfig: s.apiConfig,
YoutubeChannelID: s.syncProperties.YoutubeChannelID, YoutubeChannelID: s.syncProperties.YoutubeChannelID,
LbryChannelName: lbryChannelName, LbryChannelName: lbryChannelName,
lbryChannelID: channels[0].ChannelClaimID, lbryChannelID: channels[0].ChannelClaimID,
StopOnError: s.stopOnError, MaxTries: s.maxTries,
MaxTries: s.maxTries, ConcurrentVideos: s.concurrentVideos,
ConcurrentVideos: s.concurrentVideos, Refill: s.refill,
TakeOverExistingChannel: s.takeOverExistingChannel, Manager: s,
Refill: s.refill, LbrycrdString: s.lbrycrdString,
Manager: s, AwsS3ID: s.awsS3ID,
LbrycrdString: s.lbrycrdString, AwsS3Secret: s.awsS3Secret,
AwsS3ID: s.awsS3ID, AwsS3Region: s.awsS3Region,
AwsS3Secret: s.awsS3Secret, AwsS3Bucket: s.awsS3Bucket,
AwsS3Region: s.awsS3Region, namer: namer.NewNamer(),
AwsS3Bucket: s.awsS3Bucket, Fee: channels[0].Fee,
namer: namer.NewNamer(), publishAddress: channels[0].PublishAddress,
Fee: channels[0].Fee, transferState: channels[0].TransferState,
publishAddress: channels[0].PublishAddress,
transferState: channels[0].TransferState,
} }
shouldInterruptLoop = true shouldInterruptLoop = true
} else { } else {
@ -165,7 +150,7 @@ func (s *SyncManager) Start() error {
//TODO: implement scrambling to avoid starvation of queues //TODO: implement scrambling to avoid starvation of queues
if s.syncStatus != "" { if s.syncStatus != "" {
queuesToSync = append(queuesToSync, s.syncStatus) queuesToSync = append(queuesToSync, s.syncStatus)
} else if s.syncUpdate { } else if s.SyncFlags.SyncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else { } else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
@ -183,25 +168,23 @@ func (s *SyncManager) Start() error {
for i, c := range channels { for i, c := range channels {
log.Infof("There are %d channels in the \"%s\" queue", len(channels)-i, q) log.Infof("There are %d channels in the \"%s\" queue", len(channels)-i, q)
syncs = append(syncs, Sync{ syncs = append(syncs, Sync{
APIConfig: s.apiConfig, APIConfig: s.apiConfig,
YoutubeChannelID: c.ChannelId, YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName, LbryChannelName: c.DesiredChannelName,
lbryChannelID: c.ChannelClaimID, lbryChannelID: c.ChannelClaimID,
StopOnError: s.stopOnError, MaxTries: s.maxTries,
MaxTries: s.maxTries, ConcurrentVideos: s.concurrentVideos,
ConcurrentVideos: s.concurrentVideos, Refill: s.refill,
TakeOverExistingChannel: s.takeOverExistingChannel, Manager: s,
Refill: s.refill, LbrycrdString: s.lbrycrdString,
Manager: s, AwsS3ID: s.awsS3ID,
LbrycrdString: s.lbrycrdString, AwsS3Secret: s.awsS3Secret,
AwsS3ID: s.awsS3ID, AwsS3Region: s.awsS3Region,
AwsS3Secret: s.awsS3Secret, AwsS3Bucket: s.awsS3Bucket,
AwsS3Region: s.awsS3Region, namer: namer.NewNamer(),
AwsS3Bucket: s.awsS3Bucket, Fee: c.Fee,
namer: namer.NewNamer(), publishAddress: c.PublishAddress,
Fee: c.Fee, transferState: c.TransferState,
publishAddress: c.PublishAddress,
transferState: c.TransferState,
}) })
if q != StatusFailed { if q != StatusFailed {
continue queues continue queues
@ -248,7 +231,7 @@ func (s *SyncManager) Start() error {
break break
} }
} }
if shouldInterruptLoop || s.singleRun { if shouldInterruptLoop || s.SyncFlags.SingleRun {
break break
} }
} }
@ -265,7 +248,7 @@ func (s *SyncManager) checkUsedSpace() error {
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
if usedPctile >= 0.90 && !s.skipSpaceCheck { if usedPctile >= 0.90 && !s.SyncFlags.SkipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
} }
log.Infof("disk usage: %.1f%%", usedPctile*100) log.Infof("disk usage: %.1f%%", usedPctile*100)

View file

@ -95,7 +95,7 @@ func (s *Sync) walletSetup() error {
} }
unallocatedVideos := videosOnYoutube - (publishedCount + failedCount) unallocatedVideos := videosOnYoutube - (publishedCount + failedCount)
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelClaimAmount requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelClaimAmount
if s.Manager.upgradeMetadata { if s.Manager.SyncFlags.UpgradeMetadata {
requiredBalance += float64(notUpgradedCount) * 0.001 requiredBalance += float64(notUpgradedCount) * 0.001
} }

View file

@ -63,33 +63,31 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
// Sync stores the options that control how syncing happens // Sync stores the options that control how syncing happens
type Sync struct { type Sync struct {
APIConfig *sdk.APIConfig APIConfig *sdk.APIConfig
YoutubeChannelID string YoutubeChannelID string
LbryChannelName string LbryChannelName string
StopOnError bool MaxTries int
MaxTries int ConcurrentVideos int
ConcurrentVideos int Refill int
TakeOverExistingChannel bool Manager *SyncManager
Refill int LbrycrdString string
Manager *SyncManager AwsS3ID string
LbrycrdString string AwsS3Secret string
AwsS3ID string AwsS3Region string
AwsS3Secret string AwsS3Bucket string
AwsS3Region string Fee *sdk.Fee
AwsS3Bucket string daemon *jsonrpc.Client
Fee *sdk.Fee claimAddress string
daemon *jsonrpc.Client videoDirectory string
claimAddress string syncedVideosMux *sync.RWMutex
videoDirectory string syncedVideos map[string]sdk.SyncedVideo
syncedVideosMux *sync.RWMutex grp *stop.Group
syncedVideos map[string]sdk.SyncedVideo lbryChannelID string
grp *stop.Group namer *namer.Namer
lbryChannelID string walletMux *sync.RWMutex
namer *namer.Namer queue chan video
walletMux *sync.RWMutex transferState int
queue chan video publishAddress string
transferState int
publishAddress string
} }
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) {
@ -364,7 +362,7 @@ func deleteSyncFolder(videoDirectory string) {
} }
} }
func (s *Sync) shouldTransfer() bool { func (s *Sync) shouldTransfer() bool {
return s.transferState == 1 && s.publishAddress != "" return s.transferState == 1 && s.publishAddress != "" && !s.Manager.SyncFlags.DisableTransfers
} }
func (s *Sync) setChannelTerminationStatus(e *error) { func (s *Sync) setChannelTerminationStatus(e *error) {
var transferState *int var transferState *int
@ -576,11 +574,11 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int
} }
_, ok := videoIDMap[vID] _, ok := videoIDMap[vID]
if !ok && sv.Published { if !ok && sv.Published {
log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.removeDBUnpublished) log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.SyncFlags.RemoveDBUnpublished)
idsToRemove = append(idsToRemove, vID) idsToRemove = append(idsToRemove, vID)
} }
} }
if s.Manager.removeDBUnpublished && len(idsToRemove) > 0 { if s.Manager.SyncFlags.RemoveDBUnpublished && len(idsToRemove) > 0 {
log.Infof("removing: %s", strings.Join(idsToRemove, ",")) log.Infof("removing: %s", strings.Join(idsToRemove, ","))
err := s.Manager.apiConfig.DeleteVideos(idsToRemove) err := s.Manager.apiConfig.DeleteVideos(idsToRemove)
if err != nil { if err != nil {
@ -687,7 +685,7 @@ func (s *Sync) doSync() error {
} }
} }
if s.StopOnError { if s.Manager.SyncFlags.StopOnError {
log.Println("Will stop publishing if an error is detected") log.Println("Will stop publishing if an error is detected")
} }
@ -753,7 +751,7 @@ func (s *Sync) startWorker(workerNum int) {
"more than 90% of the space has been used.", "more than 90% of the space has been used.",
"Couldn't find private key for id", "Couldn't find private key for id",
} }
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.SyncFlags.StopOnError {
s.grp.Stop() s.grp.Stop()
} else if s.MaxTries > 1 { } else if s.MaxTries > 1 {
errorsNoRetry := []string{ errorsNoRetry := []string{
@ -979,7 +977,7 @@ func (s *Sync) processVideo(v video) (err error) {
s.syncedVideosMux.RUnlock() s.syncedVideosMux.RUnlock()
newMetadataVersion := int8(2) newMetadataVersion := int8(2)
alreadyPublished := ok && sv.Published alreadyPublished := ok && sv.Published
videoRequiresUpgrade := ok && s.Manager.upgradeMetadata && sv.MetadataVersion < newMetadataVersion videoRequiresUpgrade := ok && s.Manager.SyncFlags.UpgradeMetadata && sv.MetadataVersion < newMetadataVersion
neverRetryFailures := []string{ neverRetryFailures := []string{
"Error extracting sts from embedded url response", "Error extracting sts from embedded url response",

View file

@ -34,6 +34,17 @@ type SyncProperties struct {
YoutubeChannelID string YoutubeChannelID string
} }
type SyncFlags struct {
StopOnError bool
TakeOverExistingChannel bool
SkipSpaceCheck bool
SyncUpdate bool
SingleRun bool
RemoveDBUnpublished bool
UpgradeMetadata bool
DisableTransfers bool
}
type Fee struct { type Fee struct {
Amount string `json:"amount"` Amount string `json:"amount"`
Address string `json:"address"` Address string `json:"address"`