add more startup parameters

This commit is contained in:
Niko Storni 2018-08-01 08:56:04 -04:00
parent b93e15eb09
commit 7a3d2bf4bc
No known key found for this signature in database
GPG key ID: F37FE63398800368
6 changed files with 22 additions and 15 deletions

View file

@ -28,6 +28,8 @@ var (
syncFrom int64 syncFrom int64
syncUntil int64 syncUntil int64
concurrentJobs int concurrentJobs int
videosLimit int
maxVideoSize int
) )
func init() { func init() {
@ -42,12 +44,14 @@ func init() {
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)") ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
ytSyncCmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update") ytSyncCmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.") ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
ytSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)") ytSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
ytSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently (Default: 1)") ytSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
ytSyncCmd.Flags().IntVar(&videosLimit, "videos-limit", 1000, "how many videos to process per channel")
ytSyncCmd.Flags().IntVar(&maxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
RootCmd.AddCommand(ytSyncCmd) RootCmd.AddCommand(ytSyncCmd)
} }
@ -130,6 +134,8 @@ func ytSync(cmd *cobra.Command, args []string) {
ApiURL: apiURL, ApiURL: apiURL,
ApiToken: apiToken, ApiToken: apiToken,
BlobsDir: blobsDir, BlobsDir: blobsDir,
VideosLimit: videosLimit,
MaxVideoSize: maxVideoSize,
} }
err := sm.Start() err := sm.Start()

View file

@ -37,6 +37,8 @@ type SyncManager struct {
ApiURL string ApiURL string
ApiToken string ApiToken string
BlobsDir string BlobsDir string
VideosLimit int
MaxVideoSize int
} }
const ( const (

View file

@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error {
} }
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already published %d videos", numPublished)
if float64(numOnSource)-float64(numPublished) > maximumVideosToPublish { if float64(numOnSource)-float64(numPublished) > float64(s.Manager.VideosLimit) {
numOnSource = maximumVideosToPublish numOnSource = uint64(s.Manager.VideosLimit)
} }
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource { if numPublished > numOnSource {

View file

@ -188,7 +188,7 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {

View file

@ -203,7 +203,7 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) { func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string, maxVideoSize int) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -215,10 +215,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f
if err != nil { if err != nil {
return nil, err return nil, err
} }
if fi.Size() > 2*1024*1024*1024 { if fi.Size() > int64(maxVideoSize)*1024*1024 {
//delete the video and ignore the error //delete the video and ignore the error
_ = v.delete() _ = v.delete()
return nil, errors.Err("video is bigger than 2GB, skipping for now") return nil, errors.Err("the video is too big to sync, skipping for now")
} }
err = v.triggerThumbnailSave() err = v.triggerThumbnailSave()

View file

@ -32,7 +32,6 @@ import (
const ( const (
channelClaimAmount = 0.01 channelClaimAmount = 0.01
publishAmount = 0.01 publishAmount = 0.01
maximumVideosToPublish = 1000
) )
type video interface { type video interface {
@ -40,7 +39,7 @@ type video interface {
IDAndNum() string IDAndNum() string
PlaylistPosition() int PlaylistPosition() int
PublishedAt() time.Time PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string) (*sources.SyncSummary, error) Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error)
} }
// sorting videos // sorting videos
@ -315,7 +314,7 @@ func (s *Sync) startWorker(workerNum int) {
"Error in daemon: Cannot publish empty file", "Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response", "Error extracting sts from embedded url response",
"Client.Timeout exceeded while awaiting headers)", "Client.Timeout exceeded while awaiting headers)",
"video is bigger than 2GB, skipping for now", "the video is too big to sync, skipping for now",
} }
if util.SubstringInSlice(err.Error(), errorsNoRetry) { if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all") log.Println("This error should not be retried at all")
@ -504,11 +503,11 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
if v.PlaylistPosition() > maximumVideosToPublish { if v.PlaylistPosition() > s.Manager.VideosLimit {
log.Println(v.ID() + " is old: skipping") log.Println(v.ID() + " is old: skipping")
return nil return nil
} }
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName) summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName, s.Manager.MaxVideoSize)
if err != nil { if err != nil {
return err return err
} }