diff --git a/cmd/count.go b/cmd/count.go index cfbfa65..77f6952 100644 --- a/cmd/count.go +++ b/cmd/count.go @@ -2,6 +2,7 @@ package cmd import ( sync "github.com/lbryio/lbry.go/ytsync" + "github.com/lbryio/lbry.go/ytsync/sdk" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -22,7 +23,9 @@ func ytcount(cmd *cobra.Command, args []string) { channelID := args[1] s := sync.Sync{ - YoutubeAPIKey: ytAPIKey, + APIConfig: &sdk.APIConfig{ + YoutubeAPIKey: ytAPIKey, + }, YoutubeChannelID: channelID, } diff --git a/cmd/ytsync.go b/cmd/ytsync.go index 89c1bcd..7c75790 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -9,6 +9,7 @@ import ( "github.com/lbryio/lbry.go/util" sync "github.com/lbryio/lbry.go/ytsync" + "github.com/lbryio/lbry.go/ytsync/sdk" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -141,7 +142,7 @@ func ytSync(cmd *cobra.Command, args []string) { blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/" } - sm := sync.SyncManager{ + /*sm := sync.SyncManager{ StopOnError: stopOnError, MaxTries: maxTries, TakeOverExistingChannel: takeOverExistingChannel, @@ -150,26 +151,64 @@ func ytSync(cmd *cobra.Command, args []string) { SkipSpaceCheck: skipSpaceCheck, SyncUpdate: syncUpdate, SyncStatus: syncStatus, - SyncFrom: syncFrom, - SyncUntil: syncUntil, - ConcurrentJobs: concurrentJobs, - ConcurrentVideos: concurrentJobs, - HostName: hostname, - YoutubeChannelID: channelID, - YoutubeAPIKey: youtubeAPIKey, - ApiURL: apiURL, - ApiToken: apiToken, - BlobsDir: blobsDir, - VideosLimit: videosLimit, - MaxVideoSize: maxVideoSize, - LbrycrdString: lbrycrdString, - AwsS3ID: awsS3ID, - AwsS3Secret: awsS3Secret, - AwsS3Region: awsS3Region, - AwsS3Bucket: awsS3Bucket, - SingleRun: singleRun, + SyncProperties: &sdk.SyncProperties{ + SyncFrom: syncFrom, + SyncUntil: syncUntil, + YoutubeChannelID: channelID, + }, + APIConfig: &sdk.APIConfig{ + YoutubeAPIKey: youtubeAPIKey, + ApiURL: apiURL, + ApiToken: apiToken, + HostName: hostname, + }, + ConcurrentJobs: concurrentJobs, + ConcurrentVideos: concurrentJobs, + BlobsDir: blobsDir, + VideosLimit: videosLimit, + MaxVideoSize: maxVideoSize, + LbrycrdString: lbrycrdString, + AwsS3ID: awsS3ID, + AwsS3Secret: awsS3Secret, + AwsS3Region: awsS3Region, + AwsS3Bucket: awsS3Bucket, + SingleRun: singleRun, } - + */ + syncProperties := &sdk.SyncProperties{ + SyncFrom: syncFrom, + SyncUntil: syncUntil, + YoutubeChannelID: channelID, + } + apiConfig := &sdk.APIConfig{ + YoutubeAPIKey: youtubeAPIKey, + ApiURL: apiURL, + ApiToken: apiToken, + HostName: hostname, + } + sm := sync.NewSyncManager( + stopOnError, + maxTries, + takeOverExistingChannel, + refill, + limit, + skipSpaceCheck, + syncUpdate, + concurrentJobs, + concurrentJobs, + blobsDir, + videosLimit, + maxVideoSize, + lbrycrdString, + awsS3ID, + awsS3Secret, + awsS3Region, + awsS3Bucket, + syncStatus, + singleRun, + syncProperties, + apiConfig, + ) err := sm.Start() if err != nil { sync.SendErrorToSlack(err.Error()) diff --git a/ytsync/count.go b/ytsync/count.go index 7900921..dbba141 100644 --- a/ytsync/count.go +++ b/ytsync/count.go @@ -11,7 +11,7 @@ import ( func (s *Sync) CountVideos() (uint64, error) { client := &http.Client{ - Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, + Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey}, } service, err := youtube.New(client) diff --git a/ytsync/manager.go b/ytsync/manager.go index d619507..de9e67f 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -7,7 +7,6 @@ import ( "time" "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/null" "github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/ytsync/namer" "github.com/lbryio/lbry.go/ytsync/sdk" @@ -15,33 +14,57 @@ import ( ) type SyncManager struct { - StopOnError bool - MaxTries int - TakeOverExistingChannel bool - Refill int - Limit int - SkipSpaceCheck bool - SyncUpdate bool - ConcurrentJobs int - ConcurrentVideos int - BlobsDir string - VideosLimit int - MaxVideoSize int - LbrycrdString string - AwsS3ID string - AwsS3Secret string - AwsS3Region string - SyncStatus string - AwsS3Bucket string - SingleRun bool - ChannelProperties *sdk.ChannelProperties - APIConfig *sdk.APIConfig + stopOnError bool + maxTries int + takeOverExistingChannel bool + refill int + limit int + skipSpaceCheck bool + syncUpdate bool + concurrentJobs int + concurrentVideos int + blobsDir string + videosLimit int + maxVideoSize int + lbrycrdString string + awsS3ID string + awsS3Secret string + awsS3Region string + syncStatus string + awsS3Bucket string + singleRun bool + syncProperties *sdk.SyncProperties + apiConfig *sdk.APIConfig namer *namer.Namer } -func NewSyncManager() *SyncManager { +func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int, + skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int, + maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string, + syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager { return &SyncManager{ - namer: namer.NewNamer(), + stopOnError: stopOnError, + maxTries: maxTries, + takeOverExistingChannel: takeOverExistingChannel, + refill: refill, + limit: limit, + skipSpaceCheck: skipSpaceCheck, + syncUpdate: syncUpdate, + concurrentJobs: concurrentJobs, + concurrentVideos: concurrentVideos, + blobsDir: blobsDir, + videosLimit: videosLimit, + maxVideoSize: maxVideoSize, + lbrycrdString: lbrycrdString, + awsS3ID: awsS3ID, + awsS3Secret: awsS3Secret, + awsS3Region: awsS3Region, + awsS3Bucket: awsS3Bucket, + syncStatus: syncStatus, + singleRun: singleRun, + syncProperties: syncProperties, + apiConfig: apiConfig, + namer: namer.NewNamer(), } } @@ -56,47 +79,12 @@ const ( var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized} -type apiJobsResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data []apiYoutubeChannel `json:"data"` -} - -type apiYoutubeChannel struct { - ChannelId string `json:"channel_id"` - TotalVideos uint `json:"total_videos"` - DesiredChannelName string `json:"desired_channel_name"` - SyncServer null.String `json:"sync_server"` - Fee *struct { - Amount string `json:"amount"` - Address string `json:"address"` - Currency string `json:"currency"` - } `json:"fee"` -} - -type apiChannelStatusResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data []syncedVideo `json:"data"` -} - -type syncedVideo struct { - VideoID string `json:"video_id"` - Published bool `json:"published"` - FailureReason string `json:"failure_reason"` - ClaimName string `json:"claim_name"` -} - const ( VideoStatusPublished = "published" VideoStatusFailed = "failed" ) func (s *SyncManager) Start() error { - if s.namer == nil { - // TODO: fix me, use NewSyncManager instead - s.namer = namer.NewNamer() - } syncCount := 0 for { @@ -108,9 +96,9 @@ func (s *SyncManager) Start() error { var syncs []Sync shouldInterruptLoop := false - isSingleChannelSync := s.ChannelProperties.YoutubeChannelID != "" + isSingleChannelSync := s.syncProperties.YoutubeChannelID != "" if isSingleChannelSync { - channels, err := s.APIConfig.FetchChannels("", s.ChannelProperties) + channels, err := s.apiConfig.FetchChannels("", s.syncProperties) if err != nil { return err } @@ -120,53 +108,53 @@ func (s *SyncManager) Start() error { lbryChannelName := channels[0].DesiredChannelName syncs = make([]Sync, 1) syncs[0] = Sync{ - YoutubeAPIKey: s.YoutubeAPIKey, - YoutubeChannelID: s.YoutubeChannelID, + APIConfig: s.apiConfig, + YoutubeChannelID: s.syncProperties.YoutubeChannelID, LbryChannelName: lbryChannelName, - StopOnError: s.StopOnError, - MaxTries: s.MaxTries, - ConcurrentVideos: s.ConcurrentVideos, - TakeOverExistingChannel: s.TakeOverExistingChannel, - Refill: s.Refill, + StopOnError: s.stopOnError, + MaxTries: s.maxTries, + ConcurrentVideos: s.concurrentVideos, + TakeOverExistingChannel: s.takeOverExistingChannel, + Refill: s.refill, Manager: s, - LbrycrdString: s.LbrycrdString, - AwsS3ID: s.AwsS3ID, - AwsS3Secret: s.AwsS3Secret, - AwsS3Region: s.AwsS3Region, - AwsS3Bucket: s.AwsS3Bucket, + LbrycrdString: s.lbrycrdString, + AwsS3ID: s.awsS3ID, + AwsS3Secret: s.awsS3Secret, + AwsS3Region: s.awsS3Region, + AwsS3Bucket: s.awsS3Bucket, namer: s.namer, } shouldInterruptLoop = true } else { var queuesToSync []string - if s.SyncStatus != "" { - queuesToSync = append(queuesToSync, s.SyncStatus) - } else if s.SyncUpdate { + if s.syncStatus != "" { + queuesToSync = append(queuesToSync, s.syncStatus) + } else if s.syncUpdate { queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) } else { queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) } for _, q := range queuesToSync { - channels, err := s.APIConfig.FetchChannels(q, s.ChannelProperties) + channels, err := s.apiConfig.FetchChannels(q, s.syncProperties) if err != nil { return err } for _, c := range channels { syncs = append(syncs, Sync{ - YoutubeAPIKey: s.YoutubeAPIKey, + APIConfig: s.apiConfig, YoutubeChannelID: c.ChannelId, LbryChannelName: c.DesiredChannelName, - StopOnError: s.StopOnError, - MaxTries: s.MaxTries, - ConcurrentVideos: s.ConcurrentVideos, - TakeOverExistingChannel: s.TakeOverExistingChannel, - Refill: s.Refill, + StopOnError: s.stopOnError, + MaxTries: s.maxTries, + ConcurrentVideos: s.concurrentVideos, + TakeOverExistingChannel: s.takeOverExistingChannel, + Refill: s.refill, Manager: s, - LbrycrdString: s.LbrycrdString, - AwsS3ID: s.AwsS3ID, - AwsS3Secret: s.AwsS3Secret, - AwsS3Region: s.AwsS3Region, - AwsS3Bucket: s.AwsS3Bucket, + LbrycrdString: s.lbrycrdString, + AwsS3ID: s.awsS3ID, + AwsS3Secret: s.awsS3Secret, + AwsS3Region: s.awsS3Region, + AwsS3Bucket: s.awsS3Bucket, }) } } @@ -199,12 +187,12 @@ func (s *SyncManager) Start() error { if !shouldNotCount { syncCount++ } - if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { + if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) { shouldInterruptLoop = true break } } - if shouldInterruptLoop || s.SingleRun { + if shouldInterruptLoop || s.singleRun { break } } @@ -212,11 +200,11 @@ func (s *SyncManager) Start() error { } func (s *SyncManager) checkUsedSpace() error { - usedPctile, err := GetUsedSpace(s.BlobsDir) + usedPctile, err := GetUsedSpace(s.blobsDir) if err != nil { return err } - if usedPctile >= 0.90 && !s.SkipSpaceCheck { + if usedPctile >= 0.90 && !s.skipSpaceCheck { return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) } log.Infof("disk usage: %.1f%%", usedPctile*100) diff --git a/ytsync/namer/names.go b/ytsync/namer/names.go index db339d2..531bb67 100644 --- a/ytsync/namer/names.go +++ b/ytsync/namer/names.go @@ -24,6 +24,10 @@ func NewNamer() *Namer { } } +func (n *Namer) SetNames(names map[string]bool) { + n.names = names +} + func (n *Namer) GetNextName(prefix string) string { n.mu.Lock() defer n.mu.Unlock() diff --git a/ytsync/sdk/api.go b/ytsync/sdk/api.go index 3f25440..1223b0f 100644 --- a/ytsync/sdk/api.go +++ b/ytsync/sdk/api.go @@ -24,7 +24,7 @@ type APIConfig struct { HostName string } -type ChannelProperties struct { +type SyncProperties struct { SyncFrom int64 SyncUntil int64 YoutubeChannelID string @@ -42,7 +42,7 @@ type YoutubeChannel struct { } `json:"fee"` } -func (a *APIConfig) FetchChannels(status string, cp *ChannelProperties) ([]YoutubeChannel, error) { +func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) { type apiJobsResponse struct { Success bool `json:"success"` Error null.String `json:"error"` @@ -79,7 +79,7 @@ type SyncedVideo struct { ClaimName string `json:"claim_name"` } -func (a *APIConfig) setChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) { +func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) { type apiChannelStatusResponse struct { Success bool `json:"success"` Error null.String `json:"error"` diff --git a/ytsync/setup.go b/ytsync/setup.go index 113bbb5..05d2c18 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error { s.syncedVideosMux.RUnlock() log.Debugf("We already allocated credits for %d videos", numPublished) - if numOnSource-numPublished > s.Manager.VideosLimit { - numOnSource = s.Manager.VideosLimit + if numOnSource-numPublished > s.Manager.videosLimit { + numOnSource = s.Manager.videosLimit } minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index aaaa2f6..b3df5fc 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -17,18 +17,20 @@ import ( "syscall" "time" + "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/stop" + "github.com/lbryio/lbry.go/util" + "github.com/lbryio/lbry.go/ytsync/namer" + "github.com/lbryio/lbry.go/ytsync/sdk" + "github.com/lbryio/lbry.go/ytsync/sources" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/stop" - "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/namer" - "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" "google.golang.org/api/googleapi/transport" @@ -59,7 +61,7 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[ // Sync stores the options that control how syncing happens type Sync struct { - YoutubeAPIKey string + APIConfig *sdk.APIConfig YoutubeChannelID string LbryChannelName string StopOnError bool @@ -78,8 +80,7 @@ type Sync struct { claimAddress string videoDirectory string syncedVideosMux *sync.RWMutex - syncedVideos map[string]syncedVideo - claimNames map[string]bool + syncedVideos map[string]sdk.SyncedVideo grp *stop.Group lbryChannelID string namer *namer.Namer @@ -91,14 +92,11 @@ type Sync struct { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { s.syncedVideosMux.Lock() defer s.syncedVideosMux.Unlock() - s.syncedVideos[videoID] = syncedVideo{ + s.syncedVideos[videoID] = sdk.SyncedVideo{ VideoID: videoID, Published: published, FailureReason: failureReason, } - if claimName != "" { - s.claimNames[claimName] = true - } } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -223,13 +221,13 @@ func (s *Sync) uploadWallet() error { } func (s *Sync) setStatusSyncing() error { - syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") + syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "") if err != nil { return err } s.syncedVideosMux.Lock() s.syncedVideos = syncedVideos - s.claimNames = claimNames + s.Manager.namer.SetNames(claimNames) s.syncedVideosMux.Unlock() return nil } @@ -313,14 +311,14 @@ func (s *Sync) setChannelTerminationStatus(e *error) { return } failureReason := (*e).Error() - _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) + _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) *e = errors.Prefix(err.Error(), *e) } } else if !s.IsInterrupted() { - _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") + _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "") if err != nil { *e = err } @@ -426,7 +424,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err pv, ok := s.syncedVideos[videoID] if !ok || pv.ClaimName != c.Name { fixed++ - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) + err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) if err != nil { return total, fixed, err } @@ -595,7 +593,7 @@ func (s *Sync) startWorker(workerNum int) { SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } s.AppendSyncedVideo(v.ID(), false, err.Error(), "") - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) + err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } @@ -607,7 +605,7 @@ func (s *Sync) startWorker(workerNum int) { func (s *Sync) enqueueYoutubeVideos() error { client := &http.Client{ - Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, + Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey}, } service, err := youtube.New(client) @@ -772,7 +770,7 @@ func (s *Sync) processVideo(v video) (err error) { return nil } - if v.PlaylistPosition() > s.Manager.VideosLimit { + if v.PlaylistPosition() > s.Manager.videosLimit { log.Println(v.ID() + " is old: skipping") return nil } @@ -781,12 +779,12 @@ func (s *Sync) processVideo(v video) (err error) { return err } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.namer) + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer) if err != nil { return err } - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) + err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) }