diff --git a/main.go b/main.go index b36233c..13080eb 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/ytsync/v5/manager" "github.com/lbryio/ytsync/v5/sdk" + "github.com/lbryio/ytsync/v5/shared" ytUtils "github.com/lbryio/ytsync/v5/util" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -23,39 +24,11 @@ var Version string const defaultMaxTries = 3 var ( - flags sdk.SyncFlags - maxTries int - refill int - limit int - syncStatus string - channelID string - syncFrom int64 - syncUntil int64 - concurrentJobs int - videosLimit int - maxVideoSize int + cliFlags shared.SyncFlags maxVideoLength int ) func main() { - //grp := stop.New() - //ipPool, err := ip_manager.GetIPPool(grp) - //if err != nil { - // panic(err) - //} - // - //videoID := "vtIzMaLkCaM" - // - //ip, err := ipPool.GetIP(videoID) - //if err != nil { - // panic(err) - //} - // - //spew.Dump(ip) - // - //spew.Dump(downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)})) - //return - rand.Seed(time.Now().UnixNano()) log.SetLevel(log.DebugLevel) http.Handle("/metrics", promhttp.Handler()) @@ -69,24 +42,24 @@ func main() { Args: cobra.RangeArgs(0, 0), } - cmd.Flags().BoolVar(&flags.StopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit") - cmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") - cmd.Flags().BoolVar(&flags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") - cmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") - cmd.Flags().BoolVar(&flags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") - cmd.Flags().BoolVar(&flags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") - cmd.Flags().BoolVar(&flags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") - cmd.Flags().BoolVar(&flags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") - cmd.Flags().BoolVar(&flags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") - cmd.Flags().BoolVar(&flags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports") - cmd.Flags().BoolVar(&flags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube") - cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update") - cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.") - cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") - cmd.Flags().Int64Var(&syncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)") - cmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently") - cmd.Flags().IntVar(&videosLimit, "videos-limit", 1000, "how many videos to process per channel") - cmd.Flags().IntVar(&maxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)") + cmd.Flags().BoolVar(&cliFlags.StopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit") + cmd.Flags().IntVar(&cliFlags.MaxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") + cmd.Flags().BoolVar(&cliFlags.TakeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") + cmd.Flags().IntVar(&cliFlags.Limit, "limit", 0, "limit the amount of channels to sync") + cmd.Flags().BoolVar(&cliFlags.SkipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") + cmd.Flags().BoolVar(&cliFlags.SyncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") + cmd.Flags().BoolVar(&cliFlags.SingleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") + cmd.Flags().BoolVar(&cliFlags.RemoveDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") + cmd.Flags().BoolVar(&cliFlags.UpgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version") + cmd.Flags().BoolVar(&cliFlags.DisableTransfers, "no-transfers", false, "Skips the transferring process of videos, channels and supports") + cmd.Flags().BoolVar(&cliFlags.QuickSync, "quick", false, "Look up only the last 50 videos from youtube") + cmd.Flags().StringVar(&cliFlags.SyncStatus, "status", "", "Specify which queue to pull from. Overrides --update") + cmd.Flags().StringVar(&cliFlags.ChannelID, "channelID", "", "If specified, only this channel will be synced.") + cmd.Flags().Int64Var(&cliFlags.SyncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") + cmd.Flags().Int64Var(&cliFlags.SyncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)") + cmd.Flags().IntVar(&cliFlags.ConcurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently") + cmd.Flags().IntVar(&cliFlags.VideosLimit, "videos-limit", 1000, "how many videos to process per channel") + cmd.Flags().IntVar(&cliFlags.MaxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)") cmd.Flags().IntVar(&maxVideoLength, "max-length", 2, "Maximum video length to process (in hours)") if err := cmd.Execute(); err != nil { @@ -114,29 +87,30 @@ func ytSync(cmd *cobra.Command, args []string) { util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname) } - if syncStatus != "" && !util.InSlice(syncStatus, manager.SyncStatuses) { - log.Errorf("status must be one of the following: %v\n", manager.SyncStatuses) + if cliFlags.SyncStatus != "" && !util.InSlice(cliFlags.SyncStatus, shared.SyncStatuses) { + log.Errorf("status must be one of the following: %v\n", shared.SyncStatuses) return } - if flags.StopOnError && maxTries != defaultMaxTries { + if cliFlags.StopOnError && cliFlags.MaxTries != defaultMaxTries { log.Errorln("--stop-on-error and --max-tries are mutually exclusive") return } - if maxTries < 1 { + if cliFlags.MaxTries < 1 { log.Errorln("setting --max-tries less than 1 doesn't make sense") return } - if limit < 0 { + if cliFlags.Limit < 0 { log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense") return } + cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour apiURL := os.Getenv("LBRY_WEB_API") apiToken := os.Getenv("LBRY_API_TOKEN") youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") - lbrycrdString := os.Getenv("LBRYCRD_STRING") + lbrycrdDsn := os.Getenv("LBRYCRD_STRING") awsS3ID := os.Getenv("AWS_S3_ID") awsS3Secret := os.Getenv("AWS_S3_SECRET") awsS3Region := os.Getenv("AWS_S3_REGION") @@ -169,42 +143,30 @@ func ytSync(cmd *cobra.Command, args []string) { log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET") return } - if lbrycrdString == "" { + if lbrycrdDsn == "" { log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") } blobsDir := ytUtils.GetBlobsDir() - syncProperties := &sdk.SyncProperties{ - SyncFrom: syncFrom, - SyncUntil: syncUntil, - YoutubeChannelID: channelID, - } apiConfig := &sdk.APIConfig{ YoutubeAPIKey: youtubeAPIKey, ApiURL: apiURL, ApiToken: apiToken, HostName: hostname, } + awsConfig := &shared.AwsConfigs{ + AwsS3ID: awsS3ID, + AwsS3Secret: awsS3Secret, + AwsS3Region: awsS3Region, + AwsS3Bucket: awsS3Bucket, + } sm := manager.NewSyncManager( - flags, - maxTries, - refill, - limit, - concurrentJobs, - concurrentJobs, + cliFlags, blobsDir, - videosLimit, - maxVideoSize, - lbrycrdString, - awsS3ID, - awsS3Secret, - awsS3Region, - awsS3Bucket, - syncStatus, - syncProperties, + lbrycrdDsn, + awsConfig, apiConfig, - time.Duration(maxVideoLength)*time.Hour, ) err := sm.Start() if err != nil { diff --git a/manager/manager.go b/manager/manager.go index 9af86cb..8a09b77 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -10,94 +10,43 @@ import ( "github.com/lbryio/ytsync/v5/ip_manager" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/sdk" + "github.com/lbryio/ytsync/v5/shared" logUtils "github.com/lbryio/ytsync/v5/util" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" log "github.com/sirupsen/logrus" ) type SyncManager struct { - SyncFlags sdk.SyncFlags - maxTries int - refill int - limit int - concurrentJobs int - concurrentVideos int - blobsDir string - videosLimit int - maxVideoSize int - maxVideoLength time.Duration - lbrycrdString string - awsS3ID string - awsS3Secret string - awsS3Region string - syncStatus string - awsS3Bucket string - syncProperties *sdk.SyncProperties - apiConfig *sdk.APIConfig + CliFlags shared.SyncFlags + ApiConfig *sdk.APIConfig + LbrycrdDsn string + AwsConfigs *shared.AwsConfigs + + blobsDir string + channelsToSync []Sync } -func NewSyncManager(syncFlags sdk.SyncFlags, maxTries int, refill int, limit int, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int, - maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string, - syncStatus string, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength time.Duration) *SyncManager { +func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, awsConfigs *shared.AwsConfigs, apiConfig *sdk.APIConfig) *SyncManager { return &SyncManager{ - SyncFlags: syncFlags, - maxTries: maxTries, - refill: refill, - limit: limit, - concurrentJobs: concurrentJobs, - concurrentVideos: concurrentVideos, - blobsDir: blobsDir, - videosLimit: videosLimit, - maxVideoSize: maxVideoSize, - maxVideoLength: maxVideoLength, - lbrycrdString: lbrycrdString, - awsS3ID: awsS3ID, - awsS3Secret: awsS3Secret, - awsS3Region: awsS3Region, - awsS3Bucket: awsS3Bucket, - syncStatus: syncStatus, - syncProperties: syncProperties, - apiConfig: apiConfig, + CliFlags: cliFlags, + blobsDir: blobsDir, + LbrycrdDsn: lbrycrdDsn, + AwsConfigs: awsConfigs, + ApiConfig: apiConfig, } } - -const ( - StatusPending = "pending" // waiting for permission to sync - StatusPendingEmail = "pendingemail" // permission granted but missing email - StatusQueued = "queued" // in sync queue. will be synced soon - StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon - StatusSyncing = "syncing" // syncing now - StatusSynced = "synced" // done - StatusFailed = "failed" - StatusFinalized = "finalized" // no more changes allowed - StatusAbandoned = "abandoned" // deleted on youtube or banned -) -const LatestMetadataVersion = 2 - -var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned} - -const ( - VideoStatusPublished = "published" - VideoStatusFailed = "failed" - VideoStatusUpgradeFailed = "upgradefailed" - VideoStatusUnpublished = "unpublished" - VideoStatusTranferFailed = "transferfailed" -) - -const ( - TransferStateNotTouched = iota - TransferStatePending - TransferStateComplete - TransferStateManual -) +func (s *SyncManager) enqueueChannel(channel *shared.YoutubeChannel) { + s.channelsToSync = append(s.channelsToSync, Sync{ + DbChannelData: channel, + Manager: s, + namer: namer.NewNamer(), + }) +} func (s *SyncManager) Start() error { - if logUtils.ShouldCleanOnStartup() { err := logUtils.CleanForStartup() if err != nil { @@ -108,119 +57,63 @@ func (s *SyncManager) Start() error { var lastChannelProcessed string syncCount := 0 for { + s.channelsToSync = make([]Sync, 0, 10) // reset sync queue err := s.checkUsedSpace() if err != nil { return errors.Err(err) } - - var syncs []Sync shouldInterruptLoop := false - isSingleChannelSync := s.syncProperties.YoutubeChannelID != "" - if isSingleChannelSync { - channels, err := s.apiConfig.FetchChannels("", s.syncProperties) + if s.CliFlags.IsSingleChannelSync() { + channels, err := s.ApiConfig.FetchChannels("", &s.CliFlags) if err != nil { return errors.Err(err) } if len(channels) != 1 { return errors.Err("Expected 1 channel, %d returned", len(channels)) } - lbryChannelName := channels[0].DesiredChannelName - syncs = make([]Sync, 1) - s.maxVideoLength = time.Duration(channels[0].LengthLimit) * time.Minute - s.maxVideoSize = channels[0].SizeLimit - syncs[0] = Sync{ - APIConfig: s.apiConfig, - YoutubeChannelID: s.syncProperties.YoutubeChannelID, - LbryChannelName: lbryChannelName, - lbryChannelID: channels[0].ChannelClaimID, - MaxTries: s.maxTries, - ConcurrentVideos: s.concurrentVideos, - Refill: s.refill, - Manager: s, - MaxVideoLength: s.maxVideoLength, - LbrycrdString: s.lbrycrdString, - AwsS3ID: s.awsS3ID, - AwsS3Secret: s.awsS3Secret, - AwsS3Region: s.awsS3Region, - AwsS3Bucket: s.awsS3Bucket, - namer: namer.NewNamer(), - Fee: channels[0].Fee, - clientPublishAddress: channels[0].PublishAddress, - publicKey: channels[0].PublicKey, - transferState: channels[0].TransferState, - LastUploadedVideo: channels[0].LastUploadedVideo, - } + s.enqueueChannel(&channels[0]) shouldInterruptLoop = true } else { var queuesToSync []string - if s.syncStatus != "" { - queuesToSync = append(queuesToSync, s.syncStatus) - } else if s.SyncFlags.SyncUpdate { - queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) + if s.CliFlags.SyncStatus != "" { + queuesToSync = append(queuesToSync, s.CliFlags.SyncStatus) + } else if s.CliFlags.SyncUpdate { + queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusSynced) } else { - queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) + queuesToSync = append(queuesToSync, shared.StatusSyncing, shared.StatusQueued) } queues: for _, q := range queuesToSync { - //temporary override for sync-until to give tom the time to review the channels - if q == StatusQueued { - s.syncProperties.SyncUntil = time.Now().Add(-8 * time.Hour).Unix() - } - channels, err := s.apiConfig.FetchChannels(q, s.syncProperties) + channels, err := s.ApiConfig.FetchChannels(q, &s.CliFlags) if err != nil { return err } - for i, c := range channels { - log.Infof("There are %d channels in the \"%s\" queue", len(channels)-i, q) - maxVideoLength := s.maxVideoLength - if c.TotalSubscribers < 1000 { - maxVideoLength = 1 * time.Hour - } - maxVideoLength = time.Duration(c.LengthLimit) * time.Minute - s.maxVideoSize = c.SizeLimit - syncs = append(syncs, Sync{ - APIConfig: s.apiConfig, - YoutubeChannelID: c.ChannelId, - LbryChannelName: c.DesiredChannelName, - lbryChannelID: c.ChannelClaimID, - MaxTries: s.maxTries, - ConcurrentVideos: s.concurrentVideos, - MaxVideoLength: maxVideoLength, - Refill: s.refill, - Manager: s, - LbrycrdString: s.lbrycrdString, - AwsS3ID: s.awsS3ID, - AwsS3Secret: s.awsS3Secret, - AwsS3Region: s.awsS3Region, - AwsS3Bucket: s.awsS3Bucket, - namer: namer.NewNamer(), - Fee: c.Fee, - clientPublishAddress: c.PublishAddress, - publicKey: c.PublicKey, - transferState: c.TransferState, - LastUploadedVideo: c.LastUploadedVideo, - }) - if q != StatusFailed { - continue queues + log.Infof("Currently processing the \"%s\" queue with %d channels", q, len(channels)) + for _, c := range channels { + s.enqueueChannel(&c) + queueAll := q == shared.StatusFailed || q == shared.StatusSyncing + if !queueAll { + break queues } } + log.Infof("Drained the \"%s\" queue", q) } } - if len(syncs) == 0 { + if len(s.channelsToSync) == 0 { log.Infoln("No channels to sync. Pausing 5 minutes!") time.Sleep(5 * time.Minute) } - for _, sync := range syncs { - if lastChannelProcessed == sync.LbryChannelName { - util.SendToSlack("We just killed a sync for %s to stop looping!(%s)", sync.LbryChannelName, sync.YoutubeChannelID) - stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.LbryChannelName) + for _, sync := range s.channelsToSync { + if lastChannelProcessed == sync.DbChannelData.ChannelId { + util.SendToSlack("We just killed a sync for %s to stop looping! (%s)", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId) + stopTheLoops := errors.Err("Found channel %s running twice, set it to failed, and reprocess later", sync.DbChannelData.DesiredChannelName) sync.setChannelTerminationStatus(&stopTheLoops) continue } - lastChannelProcessed = sync.LbryChannelName + lastChannelProcessed = sync.DbChannelData.ChannelId shouldNotCount := false - logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) + logUtils.SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1) err := sync.FullCycle() //TODO: THIS IS A TEMPORARY WORK AROUND FOR THE STUPID IP LOCKUP BUG ipPool, _ := ip_manager.GetIPPool(sync.grp) @@ -255,33 +148,28 @@ func (s *SyncManager) Start() error { if err != nil { return errors.Prefix("@Nikooo777 something went wrong while reflecting blobs", err) } - logUtils.SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1) + logUtils.SendInfoToSlack("%s (%s) reached an end. Total processed channels since startup: %d", sync.DbChannelData.DesiredChannelName, sync.DbChannelData.ChannelId, syncCount+1) if !shouldNotCount { syncCount++ } - if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) { + if sync.IsInterrupted() || (s.CliFlags.Limit != 0 && syncCount >= s.CliFlags.Limit) { shouldInterruptLoop = true break } } - if shouldInterruptLoop || s.SyncFlags.SingleRun { + if shouldInterruptLoop || s.CliFlags.SingleRun { break } } return nil } -func (s *SyncManager) GetS3AWSConfig() aws.Config { - return aws.Config{ - Credentials: credentials.NewStaticCredentials(s.awsS3ID, s.awsS3Secret, ""), - Region: &s.awsS3Region, - } -} + func (s *SyncManager) checkUsedSpace() error { usedPctile, err := GetUsedSpace(logUtils.GetBlobsDir()) if err != nil { return errors.Err(err) } - if usedPctile >= 0.90 && !s.SyncFlags.SkipSpaceCheck { + if usedPctile >= 0.90 && !s.CliFlags.SkipSpaceCheck { return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) } log.Infof("disk usage: %.1f%%", usedPctile*100) diff --git a/manager/s3_storage.go b/manager/s3_storage.go index 6c8e551..8b78c89 100644 --- a/manager/s3_storage.go +++ b/manager/s3_storage.go @@ -6,7 +6,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -17,8 +16,7 @@ import ( ) func (s *Sync) getS3Downloader() (*s3manager.Downloader, error) { - creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") - s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig()) if err != nil { return nil, errors.Prefix("error starting session: ", err) } @@ -26,8 +24,7 @@ func (s *Sync) getS3Downloader() (*s3manager.Downloader, error) { return downloader, nil } func (s *Sync) getS3Uploader() (*s3manager.Uploader, error) { - creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") - s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) + s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig()) if err != nil { return nil, errors.Prefix("error starting session: ", err) } @@ -51,7 +48,7 @@ func (s *Sync) downloadWallet() error { defer out.Close() bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ - Bucket: aws.String(s.AwsS3Bucket), + Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Key: key, }) if err != nil { @@ -112,7 +109,7 @@ func (s *Sync) downloadBlockchainDB() error { defer out.Close() bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ - Bucket: aws.String(s.AwsS3Bucket), + Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Key: key, }) if err != nil { @@ -146,11 +143,11 @@ func (s *Sync) downloadBlockchainDB() error { func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string, err error) { defaultWallet = os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" tempWallet = os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet" - key = aws.String("/wallets/" + s.YoutubeChannelID) + key = aws.String("/wallets/" + s.DbChannelData.ChannelId) if logUtils.IsRegTest() { defaultWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" tempWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet" - key = aws.String("/regtest/" + s.YoutubeChannelID) + key = aws.String("/regtest/" + s.DbChannelData.ChannelId) } lbryumDir := os.Getenv("LBRYUM_DIR") @@ -176,20 +173,20 @@ func (s *Sync) getBlockchainDBPaths() (defaultDB, tempDB string, key *string, er } defaultDB = lbryumDir + "/lbc_mainnet/blockchain.db" tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.db" - key = aws.String("/blockchain_dbs/" + s.YoutubeChannelID) + key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId) if logUtils.IsRegTest() { defaultDB = lbryumDir + "/lbc_regtest/blockchain.db" tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.db" - key = aws.String("/regtest_dbs/" + s.YoutubeChannelID) + key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId) } return } func (s *Sync) uploadWallet() error { defaultWalletDir := logUtils.GetDefaultWalletPath() - key := aws.String("/wallets/" + s.YoutubeChannelID) + key := aws.String("/wallets/" + s.DbChannelData.ChannelId) if logUtils.IsRegTest() { - key = aws.String("/regtest/" + s.YoutubeChannelID) + key = aws.String("/regtest/" + s.DbChannelData.ChannelId) } if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) { @@ -208,7 +205,7 @@ func (s *Sync) uploadWallet() error { defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.AwsS3Bucket), + Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Key: key, Body: file, }) @@ -241,7 +238,7 @@ func (s *Sync) uploadBlockchainDB() error { defer file.Close() _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.AwsS3Bucket), + Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket), Key: key, Body: file, }) diff --git a/manager/setup.go b/manager/setup.go index af2eaad..ec1e771 100644 --- a/manager/setup.go +++ b/manager/setup.go @@ -9,6 +9,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/timing" logUtils "github.com/lbryio/ytsync/v5/util" "github.com/lbryio/ytsync/v5/ytapi" @@ -72,7 +73,7 @@ func (s *Sync) walletSetup() error { } log.Debugf("Starting balance is %.4f", balance) - videosOnYoutube, err := ytapi.CountVideosInChannel(s.YoutubeChannelID) + videosOnYoutube, err := ytapi.CountVideosInChannel(s.DbChannelData.ChannelId) if err != nil { return err } @@ -100,17 +101,17 @@ func (s *Sync) walletSetup() error { log.Debugf("We already allocated credits for %d published videos and %d failed videos", publishedCount, failedCount) - if videosOnYoutube > s.Manager.videosLimit { - videosOnYoutube = s.Manager.videosLimit + if videosOnYoutube > s.Manager.CliFlags.VideosLimit { + videosOnYoutube = s.Manager.CliFlags.VideosLimit } unallocatedVideos := videosOnYoutube - (publishedCount + failedCount) channelFee := channelClaimAmount - channelAlreadyClaimed := s.lbryChannelID != "" + channelAlreadyClaimed := s.DbChannelData.ChannelClaimID != "" if channelAlreadyClaimed { channelFee = 0.0 } requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelFee - if s.Manager.SyncFlags.UpgradeMetadata { + if s.Manager.CliFlags.UpgradeMetadata { requiredBalance += float64(notUpgradedCount) * 0.001 } @@ -119,8 +120,8 @@ func (s *Sync) walletSetup() error { refillAmount = math.Max(math.Max(requiredBalance-balance, minimumAccountBalance-balance), minimumRefillAmount) } - if s.Refill > 0 { - refillAmount += float64(s.Refill) + if s.Manager.CliFlags.Refill > 0 { + refillAmount += float64(s.Manager.CliFlags.Refill) } if refillAmount > 0 { @@ -136,12 +137,11 @@ func (s *Sync) walletSetup() error { } else if claimAddress == nil { return errors.Err("could not get an address") } - s.claimAddress = string(claimAddress.Items[0].Address) - if s.claimAddress == "" { - return errors.Err("found blank claim address") + if s.DbChannelData.PublishAddress == "" || !s.shouldTransfer() { + s.DbChannelData.PublishAddress = string(claimAddress.Items[0].Address) } - if s.shouldTransfer() { - s.claimAddress = s.clientPublishAddress + if s.DbChannelData.PublishAddress == "" { + return errors.Err("found blank claim address") } err = s.ensureEnoughUTXOs() @@ -300,7 +300,7 @@ func (s *Sync) waitForNewBlock() error { } func (s *Sync) GenerateRegtestBlock() error { - lbrycrd, err := logUtils.GetLbrycrdClient(s.LbrycrdString) + lbrycrd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn) if err != nil { return errors.Prefix("error getting lbrycrd client: ", err) } @@ -319,7 +319,7 @@ func (s *Sync) GenerateRegtestBlock() error { func (s *Sync) ensureChannelOwnership() error { defer func(start time.Time) { timing.TimedComponent("ensureChannelOwnership").Add(time.Since(start)) }(time.Now()) - if s.LbryChannelName == "" { + if s.DbChannelData.DesiredChannelName == "" { return errors.Err("no channel name set") } @@ -332,27 +332,27 @@ func (s *Sync) ensureChannelOwnership() error { var channelToUse *jsonrpc.Transaction if len((*channels).Items) > 0 { - if s.lbryChannelID == "" { + if s.DbChannelData.ChannelClaimID == "" { return errors.Err("this channel does not have a recorded claimID in the database. To prevent failures, updates are not supported until an entry is manually added in the database") } for _, c := range (*channels).Items { log.Debugf("checking listed channel %s (%s)", c.ClaimID, c.Name) - if c.ClaimID != s.lbryChannelID { + if c.ClaimID != s.DbChannelData.ChannelClaimID { continue } - if c.Name != s.LbryChannelName { + if c.Name != s.DbChannelData.DesiredChannelName { return errors.Err("the channel in the wallet is different than the channel in the database") } channelToUse = &c break } if channelToUse == nil { - return errors.Err("this wallet has channels but not a single one is ours! Expected claim_id: %s (%s)", s.lbryChannelID, s.LbryChannelName) + return errors.Err("this wallet has channels but not a single one is ours! Expected claim_id: %s (%s)", s.DbChannelData.ChannelClaimID, s.DbChannelData.DesiredChannelName) } - } else if s.transferState == TransferStateComplete { + } else if s.DbChannelData.TransferState == shared.TransferStateComplete { return errors.Err("the channel was transferred but appears to have been abandoned!") - } else if s.lbryChannelID != "" { - return errors.Err("the database has a channel recorded (%s) but nothing was found in our control", s.lbryChannelID) + } else if s.DbChannelData.ChannelClaimID != "" { + return errors.Err("the database has a channel recorded (%s) but nothing was found in our control", s.DbChannelData.ChannelClaimID) } channelUsesOldMetadata := false @@ -383,20 +383,23 @@ func (s *Sync) ensureChannelOwnership() error { } } - channelInfo, err := ytapi.ChannelInfo(s.YoutubeChannelID) + channelInfo, err := ytapi.ChannelInfo(s.DbChannelData.ChannelId) if err != nil { return err } thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL - thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.YoutubeChannelID, s.Manager.GetS3AWSConfig()) + thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig()) if err != nil { return err } var bannerURL *string if channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails != nil { - bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL, "banner-"+s.YoutubeChannelID, s.Manager.GetS3AWSConfig()) + bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL, + "banner-"+s.DbChannelData.ChannelId, + *s.Manager.AwsConfigs.GetS3AWSConfig(), + ) if err != nil { return err } @@ -419,14 +422,14 @@ func (s *Sync) ensureChannelOwnership() error { claimCreateOptions := jsonrpc.ClaimCreateOptions{ Title: &channelInfo.Microformat.MicroformatDataRenderer.Title, Description: &channelInfo.Microformat.MicroformatDataRenderer.Description, - Tags: tags_manager.GetTagsForChannel(s.YoutubeChannelID), + Tags: tags_manager.GetTagsForChannel(s.DbChannelData.ChannelId), Languages: languages, Locations: locations, ThumbnailURL: &thumbnailURL, } if channelUsesOldMetadata { - if s.transferState <= 1 { - c, err = s.daemon.ChannelUpdate(s.lbryChannelID, jsonrpc.ChannelUpdateOptions{ + if s.DbChannelData.TransferState <= 1 { + c, err = s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, jsonrpc.ChannelUpdateOptions{ ClearTags: util.PtrToBool(true), ClearLocations: util.PtrToBool(true), ClearLanguages: util.PtrToBool(true), @@ -436,11 +439,11 @@ func (s *Sync) ensureChannelOwnership() error { }, }) } else { - logUtils.SendInfoToSlack("%s (%s) has a channel with old metadata but isn't in our control anymore. Ignoring", s.LbryChannelName, s.lbryChannelID) + logUtils.SendInfoToSlack("%s (%s) has a channel with old metadata but isn't in our control anymore. Ignoring", s.DbChannelData.DesiredChannelName, s.DbChannelData.ChannelClaimID) return nil } } else { - c, err = s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{ + c, err = s.daemon.ChannelCreate(s.DbChannelData.DesiredChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{ ClaimCreateOptions: claimCreateOptions, CoverURL: bannerURL, }) @@ -450,8 +453,8 @@ func (s *Sync) ensureChannelOwnership() error { return err } - s.lbryChannelID = c.Outputs[0].ClaimID - return s.Manager.apiConfig.SetChannelClaimID(s.YoutubeChannelID, s.lbryChannelID) + s.DbChannelData.ChannelClaimID = c.Outputs[0].ClaimID + return s.Manager.ApiConfig.SetChannelClaimID(s.DbChannelData.ChannelId, s.DbChannelData.ChannelClaimID) } func (s *Sync) addCredits(amountToAdd float64) error { @@ -460,7 +463,7 @@ func (s *Sync) addCredits(amountToAdd float64) error { timing.TimedComponent("addCredits").Add(time.Since(start)) }(start) log.Printf("Adding %f credits", amountToAdd) - lbrycrdd, err := logUtils.GetLbrycrdClient(s.LbrycrdString) + lbrycrdd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn) if err != nil { return err } diff --git a/manager/transfer.go b/manager/transfer.go index 1fdd8fb..d04272d 100644 --- a/manager/transfer.go +++ b/manager/transfer.go @@ -10,7 +10,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/lbryio/ytsync/v5/sdk" + "github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/timing" log "github.com/sirupsen/logrus" @@ -97,7 +97,7 @@ func abandonSupports(s *Sync) (float64, error) { //TODO: remove this once the SDK team fixes their RPC bugs.... s.daemon.SetRPCTimeout(60 * time.Second) defer s.daemon.SetRPCTimeout(5 * time.Minute) - for i := 0; i < s.ConcurrentVideos; i++ { + for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ { consumerWG.Add(1) go func() { defer consumerWG.Done() @@ -189,7 +189,7 @@ func abandonSupports(s *Sync) (float64, error) { type updateInfo struct { ClaimID string streamUpdateOptions *jsonrpc.StreamUpdateOptions - videoStatus *sdk.VideoStatus + videoStatus *shared.VideoStatus } func transferVideos(s *Sync) error { @@ -199,7 +199,7 @@ func transferVideos(s *Sync) error { }(start) cleanTransfer := true - streamChan := make(chan updateInfo, s.ConcurrentVideos) + streamChan := make(chan updateInfo, s.Manager.CliFlags.ConcurrentJobs) account, err := s.getDefaultAccount() if err != nil { return err @@ -213,13 +213,13 @@ func transferVideos(s *Sync) error { go func() { defer producerWG.Done() for _, video := range s.syncedVideos { - if !video.Published || video.Transferred || video.MetadataVersion != LatestMetadataVersion { + if !video.Published || video.Transferred || video.MetadataVersion != shared.LatestMetadataVersion { continue } var stream *jsonrpc.Claim = nil for _, c := range streams.Items { - if c.ClaimID != video.ClaimID || (c.SigningChannel != nil && c.SigningChannel.ClaimID != s.lbryChannelID) { + if c.ClaimID != video.ClaimID || (c.SigningChannel != nil && c.SigningChannel.ClaimID != s.DbChannelData.ChannelClaimID) { continue } stream = &c @@ -232,7 +232,7 @@ func transferVideos(s *Sync) error { streamUpdateOptions := jsonrpc.StreamUpdateOptions{ StreamCreateOptions: &jsonrpc.StreamCreateOptions{ ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ - ClaimAddress: &s.clientPublishAddress, + ClaimAddress: &s.DbChannelData.PublishAddress, FundingAccountIDs: []string{ account, }, @@ -240,12 +240,12 @@ func transferVideos(s *Sync) error { }, Bid: util.PtrToString("0.005"), // Todo - Dont hardcode } - videoStatus := sdk.VideoStatus{ - ChannelID: s.YoutubeChannelID, + videoStatus := shared.VideoStatus{ + ChannelID: s.DbChannelData.ChannelId, VideoID: video.VideoID, ClaimID: video.ClaimID, ClaimName: video.ClaimName, - Status: VideoStatusPublished, + Status: shared.VideoStatusPublished, IsTransferred: util.PtrToBool(true), } streamChan <- updateInfo{ @@ -257,7 +257,7 @@ func transferVideos(s *Sync) error { }() consumerWG := &stop.Group{} - for i := 0; i < s.ConcurrentVideos; i++ { + for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ { consumerWG.Add(1) go func(worker int) { defer consumerWG.Done() @@ -290,13 +290,13 @@ func (s *Sync) streamUpdate(ui *updateInfo) error { timing.TimedComponent("transferStreamUpdate").Add(time.Since(start)) if updateError != nil { ui.videoStatus.FailureReason = updateError.Error() - ui.videoStatus.Status = VideoStatusTranferFailed + ui.videoStatus.Status = shared.VideoStatusTranferFailed ui.videoStatus.IsTransferred = util.PtrToBool(false) } else { ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0) } log.Infof("TRANSFERRED %t", *ui.videoStatus.IsTransferred) - statusErr := s.APIConfig.MarkVideoStatus(*ui.videoStatus) + statusErr := s.Manager.ApiConfig.MarkVideoStatus(*ui.videoStatus) if statusErr != nil { return errors.Prefix(statusErr.Error(), updateError) } @@ -318,7 +318,7 @@ func transferChannel(s *Sync) error { } var channelClaim *jsonrpc.Transaction = nil for _, c := range channelClaims.Items { - if c.ClaimID != s.lbryChannelID { + if c.ClaimID != s.DbChannelData.ChannelClaimID { continue } channelClaim = &c @@ -332,11 +332,11 @@ func transferChannel(s *Sync) error { Bid: util.PtrToString(fmt.Sprintf("%.6f", channelClaimAmount-0.005)), ChannelCreateOptions: jsonrpc.ChannelCreateOptions{ ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ - ClaimAddress: &s.clientPublishAddress, + ClaimAddress: &s.DbChannelData.PublishAddress, }, }, } - result, err := s.daemon.ChannelUpdate(s.lbryChannelID, updateOptions) + result, err := s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, updateOptions) if err != nil { return errors.Err(err) } diff --git a/manager/ytsync.go b/manager/ytsync.go index b1611d3..dad8020 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -15,6 +15,7 @@ import ( "github.com/lbryio/ytsync/v5/ip_manager" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/sdk" + "github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/sources" "github.com/lbryio/ytsync/v5/thumbs" "github.com/lbryio/ytsync/v5/timing" @@ -40,35 +41,18 @@ const ( // Sync stores the options that control how syncing happens type Sync struct { - APIConfig *sdk.APIConfig - YoutubeChannelID string - LbryChannelName string - MaxTries int - ConcurrentVideos int - Refill int - Manager *SyncManager - LbrycrdString string - AwsS3ID string - AwsS3Secret string - AwsS3Region string - AwsS3Bucket string - Fee *sdk.Fee - daemon *jsonrpc.Client - claimAddress string - videoDirectory string - syncedVideosMux *sync.RWMutex - syncedVideos map[string]sdk.SyncedVideo - grp *stop.Group - lbryChannelID string - namer *namer.Namer - walletMux *sync.RWMutex - queue chan ytapi.Video - transferState int - clientPublishAddress string - publicKey string - defaultAccountID string - MaxVideoLength time.Duration - LastUploadedVideo string + DbChannelData *shared.YoutubeChannel + Manager *SyncManager + + daemon *jsonrpc.Client + videoDirectory string + syncedVideosMux *sync.RWMutex + syncedVideos map[string]sdk.SyncedVideo + grp *stop.Group + namer *namer.Namer + walletMux *sync.RWMutex + queue chan ytapi.Video + defaultAccountID string } func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) { @@ -96,7 +80,7 @@ func (s *Sync) IsInterrupted() bool { } func (s *Sync) setStatusSyncing() error { - syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "", nil) + syncedVideos, claimNames, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusSyncing, "", nil) if err != nil { return err } @@ -107,24 +91,12 @@ func (s *Sync) setStatusSyncing() error { return nil } -func (s *Sync) setExceptions() { - if s.YoutubeChannelID == "UCwjQfNRW6sGYb__pd7d4nUg" { //@FreeTalkLive - s.MaxVideoLength = 9999 * time.Hour // skips max length checks - s.Manager.maxVideoSize = 0 - } -} - var stopGroup = stop.New() func (s *Sync) FullCycle() (e error) { if os.Getenv("HOME") == "" { return errors.Err("no $HOME env var found") } - if s.YoutubeChannelID == "" { - return errors.Err("channel ID not provided") - } - - s.setExceptions() defer timing.ClearTimings() s.syncedVideosMux = &sync.RWMutex{} s.walletMux = &sync.RWMutex{} @@ -201,7 +173,7 @@ func (s *Sync) FullCycle() (e error) { func (s *Sync) processTransfers() (e error) { log.Println("Processing transfers") - if s.transferState != 2 { + if s.DbChannelData.TransferState != 2 { err := waitConfirmations(s) if err != nil { return err @@ -212,7 +184,7 @@ func (s *Sync) processTransfers() (e error) { return errors.Prefix(fmt.Sprintf("%.6f LBCs were abandoned before failing", supportAmount), err) } if supportAmount > 0 { - logUtils.SendInfoToSlack("(%s) %.6f LBCs were abandoned and should be used as support", s.YoutubeChannelID, supportAmount) + logUtils.SendInfoToSlack("(%s) %.6f LBCs were abandoned and should be used as support", s.DbChannelData.ChannelId, supportAmount) } err = transferVideos(s) if err != nil { @@ -233,14 +205,14 @@ func (s *Sync) processTransfers() (e error) { return err } isTip := true - summary, err := s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, []string{defaultAccount}, nil) + summary, err := s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, []string{defaultAccount}, nil) if err != nil { if strings.Contains(err.Error(), "tx-size") { //TODO: this is a silly workaround and should be written in an recursive function - summary, err = s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil) + summary, err = s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil) if err != nil { return errors.Prefix(fmt.Sprintf("something went wrong while tipping the channel for %.6f LBCs", supportAmount), err) } - summary, err = s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil) + summary, err = s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount/2.0), &isTip, nil, []string{defaultAccount}, nil) if err != nil { return errors.Err(err) } @@ -267,7 +239,7 @@ func deleteSyncFolder(videoDirectory string) { } func (s *Sync) shouldTransfer() bool { - return s.transferState >= 1 && s.clientPublishAddress != "" && !s.Manager.SyncFlags.DisableTransfers + return s.DbChannelData.TransferState >= 1 && s.DbChannelData.PublishAddress != "" && !s.Manager.CliFlags.DisableTransfers } func (s *Sync) setChannelTerminationStatus(e *error) { @@ -275,7 +247,7 @@ func (s *Sync) setChannelTerminationStatus(e *error) { if s.shouldTransfer() { if *e == nil { - transferState = util.PtrToInt(TransferStateComplete) + transferState = util.PtrToInt(shared.TransferStateComplete) } } if *e != nil { @@ -288,13 +260,13 @@ func (s *Sync) setChannelTerminationStatus(e *error) { return } failureReason := (*e).Error() - _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason, transferState) + _, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusFailed, failureReason, transferState) if err != nil { - msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName) + msg := fmt.Sprintf("Failed setting failed state for channel %s", s.DbChannelData.DesiredChannelName) *e = errors.Prefix(msg+err.Error(), *e) } } else if !s.IsInterrupted() { - _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "", transferState) + _, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusSynced, "", transferState) if err != nil { *e = err } @@ -397,7 +369,7 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) { abandonedClaims := false videoIDs := make(map[string]jsonrpc.Claim) for _, c := range claims { - if !isYtsyncClaim(c, s.lbryChannelID) { + if !isYtsyncClaim(c, s.DbChannelData.ChannelClaimID) { continue } tn := c.Value.GetThumbnail().GetUrl() @@ -415,7 +387,7 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) { claimToAbandon = cl videoIDs[videoID] = c } - if claimToAbandon.Address != s.clientPublishAddress && !s.syncedVideos[videoID].Transferred { + if claimToAbandon.Address != s.DbChannelData.PublishAddress && !s.syncedVideos[videoID].Transferred { log.Debugf("abandoning %+v", claimToAbandon) _, err := s.daemon.StreamAbandon(claimToAbandon.Txid, claimToAbandon.Nout, nil, false) if err != nil { @@ -444,7 +416,7 @@ type ytsyncClaim struct { func (s *Sync) mapFromClaims(claims []jsonrpc.Claim) map[string]ytsyncClaim { videoIDMap := make(map[string]ytsyncClaim, len(claims)) for _, c := range claims { - if !isYtsyncClaim(c, s.lbryChannelID) { + if !isYtsyncClaim(c, s.DbChannelData.ChannelClaimID) { continue } tn := c.Value.GetThumbnail().GetUrl() @@ -513,10 +485,10 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim) } fixed++ log.Debugf("updating %s in the database", videoID) - err = s.Manager.apiConfig.MarkVideoStatus(sdk.VideoStatus{ - ChannelID: s.YoutubeChannelID, + err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{ + ChannelID: s.DbChannelData.ChannelId, VideoID: videoID, - Status: VideoStatusPublished, + Status: shared.VideoStatusPublished, ClaimID: chainInfo.ClaimID, ClaimName: chainInfo.ClaimName, Size: util.PtrToInt64(int64(claimSize)), @@ -562,13 +534,13 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim) } _, ok := ownClaimsInfo[vID] if !ok && sv.Published { - log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.SyncFlags.RemoveDBUnpublished) + log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.CliFlags.RemoveDBUnpublished) idsToRemove = append(idsToRemove, vID) } } - if s.Manager.SyncFlags.RemoveDBUnpublished && len(idsToRemove) > 0 { + if s.Manager.CliFlags.RemoveDBUnpublished && len(idsToRemove) > 0 { log.Infof("removing: %s", strings.Join(idsToRemove, ",")) - err := s.Manager.apiConfig.DeleteVideos(idsToRemove) + err := s.Manager.ApiConfig.DeleteVideos(idsToRemove) if err != nil { return count, fixed, len(idsToRemove), err } @@ -599,7 +571,7 @@ func (s *Sync) getClaims(defaultOnly bool) ([]jsonrpc.Claim, error) { } items := make([]jsonrpc.Claim, 0, len(claims.Items)) for _, c := range claims.Items { - if c.SigningChannel != nil && c.SigningChannel.ClaimID == s.lbryChannelID { + if c.SigningChannel != nil && c.SigningChannel.ClaimID == s.DbChannelData.ChannelClaimID { items = append(items, c) } } @@ -656,11 +628,11 @@ func (s *Sync) checkIntegrity() error { } if pubsOnWallet > pubsOnDB { //This case should never happen - logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) + logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId) return errors.Err("not all published videos are in the database") } if pubsOnWallet < pubsOnDB { - logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) + logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId) } _, err = s.getUnsentSupports() //TODO: use the returned value when it works @@ -693,24 +665,24 @@ func (s *Sync) doSync() error { return err } - if s.transferState < TransferStateComplete { - cert, err := s.daemon.ChannelExport(s.lbryChannelID, nil, nil) + if s.DbChannelData.TransferState < shared.TransferStateComplete { + cert, err := s.daemon.ChannelExport(s.DbChannelData.ChannelClaimID, nil, nil) if err != nil { return errors.Prefix("error getting channel cert", err) } if cert != nil { - err = s.APIConfig.SetChannelCert(string(*cert), s.lbryChannelID) + err = s.Manager.ApiConfig.SetChannelCert(string(*cert), s.DbChannelData.ChannelClaimID) if err != nil { return errors.Prefix("error setting channel cert", err) } } } - if s.Manager.SyncFlags.StopOnError { + if s.Manager.CliFlags.StopOnError { log.Println("Will stop publishing if an error is detected") } - for i := 0; i < s.ConcurrentVideos; i++ { + for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ { s.grp.Add(1) go func(i int) { defer s.grp.Done() @@ -718,7 +690,7 @@ func (s *Sync) doSync() error { }(i) } - if s.LbryChannelName == "@UCBerkeley" { + if s.DbChannelData.DesiredChannelName == "@UCBerkeley" { err = errors.Err("UCB is not supported in this version of YTSYNC") } else { err = s.enqueueYoutubeVideos() @@ -780,9 +752,9 @@ func (s *Sync) startWorker(workerNum int) { "Couldn't find private key for id", "You already have a stream claim published under the name", } - if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.SyncFlags.StopOnError { + if util.SubstringInSlice(err.Error(), fatalErrors) || s.Manager.CliFlags.StopOnError { s.grp.Stop() - } else if s.MaxTries > 1 { + } else if s.Manager.CliFlags.MaxTries > 1 { errorsNoRetry := []string{ "non 200 status code received", "This video contains content from", @@ -812,7 +784,7 @@ func (s *Sync) startWorker(workerNum int) { } if util.SubstringInSlice(err.Error(), errorsNoRetry) { log.Println("This error should not be retried at all") - } else if tryCount < s.MaxTries { + } else if tryCount < s.Manager.CliFlags.MaxTries { if util.SubstringInSlice(err.Error(), []string{ "txn-mempool-conflict", "too-long-mempool-chain", @@ -861,14 +833,14 @@ func (s *Sync) startWorker(workerNum int) { existingClaimSize = existingClaim.Size } } - videoStatus := VideoStatusFailed + videoStatus := shared.VideoStatusFailed if strings.Contains(err.Error(), "upgrade failed") { - videoStatus = VideoStatusUpgradeFailed + videoStatus = shared.VideoStatusUpgradeFailed } else { s.AppendSyncedVideo(v.ID(), false, err.Error(), existingClaimName, existingClaimID, 0, existingClaimSize) } - err = s.Manager.apiConfig.MarkVideoStatus(sdk.VideoStatus{ - ChannelID: s.YoutubeChannelID, + err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{ + ChannelID: s.DbChannelData.ChannelId, VideoID: v.ID(), Status: videoStatus, ClaimID: existingClaimID, @@ -893,12 +865,12 @@ func (s *Sync) enqueueYoutubeVideos() error { return err } - videos, err := ytapi.GetVideosToSync(s.APIConfig, s.YoutubeChannelID, s.syncedVideos, s.Manager.SyncFlags.QuickSync, s.Manager.videosLimit, ytapi.VideoParams{ + videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosLimit, ytapi.VideoParams{ VideoDir: s.videoDirectory, - S3Config: s.Manager.GetS3AWSConfig(), + S3Config: *s.Manager.AwsConfigs.GetS3AWSConfig(), Stopper: s.grp, IPPool: ipPool, - }, s.LastUploadedVideo) + }, s.DbChannelData.LastUploadedVideo) if err != nil { return err } @@ -944,7 +916,7 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) { s.syncedVideosMux.RUnlock() newMetadataVersion := int8(2) alreadyPublished := ok && sv.Published - videoRequiresUpgrade := ok && s.Manager.SyncFlags.UpgradeMetadata && sv.MetadataVersion < newMetadataVersion + videoRequiresUpgrade := ok && s.Manager.CliFlags.UpgradeMetadata && sv.MetadataVersion < newMetadataVersion neverRetryFailures := []string{ "Error extracting sts from embedded url response", @@ -972,7 +944,7 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) { return nil } - if !videoRequiresUpgrade && v.PlaylistPosition() >= s.Manager.videosLimit { + if !videoRequiresUpgrade && v.PlaylistPosition() >= s.Manager.CliFlags.VideosLimit { log.Println(v.ID() + " is old: skipping") return nil } @@ -985,13 +957,13 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) { return err } sp := sources.SyncParams{ - ClaimAddress: s.claimAddress, + ClaimAddress: s.DbChannelData.PublishAddress, Amount: publishAmount, - ChannelID: s.lbryChannelID, - MaxVideoSize: s.Manager.maxVideoSize, + ChannelID: s.DbChannelData.ChannelClaimID, + MaxVideoSize: s.DbChannelData.SizeLimit, Namer: s.namer, - MaxVideoLength: s.MaxVideoLength, - Fee: s.Fee, + MaxVideoLength: time.Duration(s.DbChannelData.LengthLimit) * time.Minute, + Fee: s.DbChannelData.Fee, DefaultAccount: da, } @@ -1001,14 +973,14 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) { } s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName, summary.ClaimID, newMetadataVersion, *v.Size()) - err = s.Manager.apiConfig.MarkVideoStatus(sdk.VideoStatus{ - ChannelID: s.YoutubeChannelID, + err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{ + ChannelID: s.DbChannelData.ChannelId, VideoID: v.ID(), - Status: VideoStatusPublished, + Status: shared.VideoStatusPublished, ClaimID: summary.ClaimID, ClaimName: summary.ClaimName, Size: v.Size(), - MetaDataVersion: LatestMetadataVersion, + MetaDataVersion: shared.LatestMetadataVersion, IsTransferred: util.PtrToBool(s.shouldTransfer()), }) if err != nil { @@ -1019,7 +991,7 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) { } func (s *Sync) importPublicKey() error { - if s.publicKey != "" { + if s.DbChannelData.PublicKey != "" { accountsResponse, err := s.daemon.AccountList(1, 50) if err != nil { return errors.Err(err) @@ -1030,13 +1002,13 @@ func (s *Sync) importPublicKey() error { } for _, a := range accountsResponse.Items { if *a.Ledger == ledger { - if a.PublicKey == s.publicKey { + if a.PublicKey == s.DbChannelData.PublicKey { return nil } } } - log.Infof("Could not find public key %s in the wallet. Importing it...", s.publicKey) - _, err = s.daemon.AccountAdd(s.LbryChannelName, nil, nil, &s.publicKey, util.PtrToBool(true), nil) + log.Infof("Could not find public key %s in the wallet. Importing it...", s.DbChannelData.PublicKey) + _, err = s.daemon.AccountAdd(s.DbChannelData.DesiredChannelName, nil, nil, &s.DbChannelData.PublicKey, util.PtrToBool(true), nil) return errors.Err(err) } return nil @@ -1048,7 +1020,7 @@ func (s *Sync) getUnsentSupports() (float64, error) { if err != nil { return 0, errors.Err(err) } - if s.transferState == 2 { + if s.DbChannelData.TransferState == 2 { balance, err := s.daemon.AccountBalance(&defaultAccount) if err != nil { return 0, err @@ -1079,8 +1051,8 @@ func (s *Sync) getUnsentSupports() (float64, error) { } } } - if balanceAmount > 10 && sentSupports < 1 { - logUtils.SendErrorToSlack("(%s) this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it", s.YoutubeChannelID, balanceAmount, sentSupports) + if balanceAmount > 10 && sentSupports < 1 && s.DbChannelData.TransferState > 1 { + logUtils.SendErrorToSlack("(%s) this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it", s.DbChannelData.ChannelId, balanceAmount, sentSupports) return balanceAmount - 10, nil } } diff --git a/sdk/api.go b/sdk/api.go index c542cc8..4d15ae6 100644 --- a/sdk/api.go +++ b/sdk/api.go @@ -13,6 +13,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/null" + "github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/util" @@ -30,59 +31,21 @@ type APIConfig struct { HostName string } -type SyncProperties struct { - SyncFrom int64 - SyncUntil int64 - YoutubeChannelID string -} - -type SyncFlags struct { - StopOnError bool - TakeOverExistingChannel bool - SkipSpaceCheck bool - SyncUpdate bool - SingleRun bool - RemoveDBUnpublished bool - UpgradeMetadata bool - DisableTransfers bool - QuickSync bool -} - -type Fee struct { - Amount string `json:"amount"` - Address string `json:"address"` - Currency string `json:"currency"` -} -type YoutubeChannel struct { - ChannelId string `json:"channel_id"` - TotalVideos uint `json:"total_videos"` - TotalSubscribers uint `json:"total_subscribers"` - DesiredChannelName string `json:"desired_channel_name"` - Fee *Fee `json:"fee"` - ChannelClaimID string `json:"channel_claim_id"` - TransferState int `json:"transfer_state"` - PublishAddress string `json:"publish_address"` - PublicKey string `json:"public_key"` - LengthLimit int `json:"length_limit"` - SizeLimit int `json:"size_limit"` - LastUploadedVideo string `json:"last_uploaded_video"` -} - -func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) { +func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) { type apiJobsResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data []YoutubeChannel `json:"data"` + Success bool `json:"success"` + Error null.String `json:"error"` + Data []shared.YoutubeChannel `json:"data"` } endpoint := a.ApiURL + "/yt/jobs" res, err := http.PostForm(endpoint, url.Values{ "auth_token": {a.ApiToken}, "sync_status": {status}, "min_videos": {strconv.Itoa(1)}, - "after": {strconv.Itoa(int(cp.SyncFrom))}, - "before": {strconv.Itoa(int(cp.SyncUntil))}, + "after": {strconv.Itoa(int(cliFlags.SyncFrom))}, + "before": {strconv.Itoa(int(cliFlags.SyncUntil))}, "sync_server": {a.HostName}, - "channel_id": {cp.YoutubeChannelID}, + "channel_id": {cliFlags.ChannelID}, }) if err != nil { return nil, errors.Err(err) @@ -93,7 +56,7 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC util.SendErrorToSlack("Error %d while trying to call %s. Waiting to retry", res.StatusCode, endpoint) log.Debugln(string(body)) time.Sleep(30 * time.Second) - return a.FetchChannels(status, cp) + return a.FetchChannels(status, cliFlags) } var response apiJobsResponse err = json.Unmarshal(body, &response) @@ -129,7 +92,6 @@ func sanitizeFailureReason(s *string) { } func (a *APIConfig) SetChannelCert(certHex string, channelID string) error { - type apiSetChannelCertResponse struct { Success bool `json:"success"` Error null.String `json:"error"` @@ -300,19 +262,7 @@ func (a *APIConfig) DeleteVideos(videos []string) error { return errors.Err("invalid API response. Status code: %d", res.StatusCode) } -type VideoStatus struct { - ChannelID string - VideoID string - Status string - ClaimID string - ClaimName string - FailureReason string - Size *int64 - MetaDataVersion uint - IsTransferred *bool -} - -func (a *APIConfig) MarkVideoStatus(status VideoStatus) error { +func (a *APIConfig) MarkVideoStatus(status shared.VideoStatus) error { endpoint := a.ApiURL + "/yt/video_status" sanitizeFailureReason(&status.FailureReason) diff --git a/shared/shared.go b/shared/shared.go new file mode 100644 index 0000000..95f425b --- /dev/null +++ b/shared/shared.go @@ -0,0 +1,112 @@ +package shared + +import ( + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" +) + +type Fee struct { + Amount string `json:"amount"` + Address string `json:"address"` + Currency string `json:"currency"` +} +type YoutubeChannel struct { + ChannelId string `json:"channel_id"` + TotalVideos uint `json:"total_videos"` + TotalSubscribers uint `json:"total_subscribers"` + DesiredChannelName string `json:"desired_channel_name"` + Fee *Fee `json:"fee"` + ChannelClaimID string `json:"channel_claim_id"` + TransferState int `json:"transfer_state"` + PublishAddress string `json:"publish_address"` + PublicKey string `json:"public_key"` + LengthLimit int `json:"length_limit"` + SizeLimit int `json:"size_limit"` + LastUploadedVideo string `json:"last_uploaded_video"` +} + +type SyncFlags struct { + StopOnError bool + TakeOverExistingChannel bool + SkipSpaceCheck bool + SyncUpdate bool + SingleRun bool + RemoveDBUnpublished bool + UpgradeMetadata bool + DisableTransfers bool + QuickSync bool + MaxTries int + Refill int + Limit int + SyncStatus string + ChannelID string + SyncFrom int64 + SyncUntil int64 + ConcurrentJobs int + VideosLimit int + MaxVideoSize int + MaxVideoLength time.Duration +} + +func (f *SyncFlags) IsSingleChannelSync() bool { + return f.ChannelID != "" +} + +type VideoStatus struct { + ChannelID string + VideoID string + Status string + ClaimID string + ClaimName string + FailureReason string + Size *int64 + MetaDataVersion uint + IsTransferred *bool +} + +const ( + StatusPending = "pending" // waiting for permission to sync + StatusPendingEmail = "pendingemail" // permission granted but missing email + StatusQueued = "queued" // in sync queue. will be synced soon + StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon + StatusSyncing = "syncing" // syncing now + StatusSynced = "synced" // done + StatusFailed = "failed" + StatusFinalized = "finalized" // no more changes allowed + StatusAbandoned = "abandoned" // deleted on youtube or banned +) + +var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned} + +const LatestMetadataVersion = 2 + +const ( + VideoStatusPublished = "published" + VideoStatusFailed = "failed" + VideoStatusUpgradeFailed = "upgradefailed" + VideoStatusUnpublished = "unpublished" + VideoStatusTranferFailed = "transferfailed" +) + +const ( + TransferStateNotTouched = iota + TransferStatePending + TransferStateComplete + TransferStateManual +) + +type AwsConfigs struct { + AwsS3ID string + AwsS3Secret string + AwsS3Region string + AwsS3Bucket string +} + +func (a *AwsConfigs) GetS3AWSConfig() *aws.Config { + return &aws.Config{ + Credentials: credentials.NewStaticCredentials(a.AwsS3ID, a.AwsS3Secret, ""), + Region: &a.AwsS3Region, + } +} diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 639b394..0b0a3bb 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -13,6 +13,7 @@ import ( "time" "github.com/lbryio/ytsync/v5/downloader/ytdl" + "github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/ip_manager" "github.com/lbryio/ytsync/v5/namer" @@ -430,7 +431,7 @@ type SyncParams struct { MaxVideoSize int Namer *namer.Namer MaxVideoLength time.Duration - Fee *sdk.Fee + Fee *shared.Fee DefaultAccount string } diff --git a/ytapi/ytapi.go b/ytapi/ytapi.go index 2d19e58..ddf13bc 100644 --- a/ytapi/ytapi.go +++ b/ytapi/ytapi.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/util" "github.com/lbryio/ytsync/v5/downloader/ytdl" @@ -208,7 +209,7 @@ func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopC } video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool) if err != nil { - errSDK := config.MarkVideoStatus(sdk.VideoStatus{ + errSDK := config.MarkVideoStatus(shared.VideoStatus{ ChannelID: channelID, VideoID: videoID, Status: "failed",