diff --git a/Gopkg.lock b/Gopkg.lock index 27863b6..2a72ba3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,7 +2,7 @@ [[projects]] - digest = "1:90272eae3bf75d2aa681ff3eee6cf4f49e8b06db533dc9c830ef214e5abbaaf2" + digest = "1:9a88883f474d09f1da61894cd8115c7f33988d6941e4f6236324c777aaff8f2c" name = "github.com/PuerkitoBio/goquery" packages = ["."] pruneopts = "" @@ -10,7 +10,7 @@ version = "v1.4.1" [[projects]] - digest = "1:4d0cb5aec47a2aec8b8b211540dc59902f8f00d571281356597b41ae349f4885" + digest = "1:e3726ad6f38f710e84c8dcd0e830014de6eaeea81f28d91ae898afecc078479a" name = "github.com/andybalholm/cascadia" packages = ["."] pruneopts = "" @@ -18,7 +18,7 @@ version = "v1.0.0" [[projects]] - digest = "1:d4336a8cea03c9729c9be12c142938a28a0dbac23a316f019f6fee323cf03709" + digest = "1:261d95f4464744d542759a7a33846f56f24113f5a93c7577f4cd7044f7cb3d76" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -55,7 +55,7 @@ [[projects]] branch = "master" - digest = "1:ea2251fa804d1b978feac8146d751b32ce2017eaf1f2915fde0df389bacaf383" + digest = "1:cc8ebf0c6745d09f728f1fa4fbd29baaa2e3a65efb49b5fefb0c163171ee7863" name = "github.com/btcsuite/btcd" packages = [ "btcec", @@ -78,7 +78,7 @@ [[projects]] branch = "master" - digest = "1:471ae435f9ad7fc2f6b7a2e91ca026a124792859a7033fa60579c3aa618161ed" + digest = "1:b0f4d2431c167d7127a029210c1a7cdc33c9114c1b3fd3582347baad5e832588" name = "github.com/btcsuite/btcutil" packages = [ ".", @@ -98,31 +98,20 @@ [[projects]] branch = "master" - digest = "1:57c911bbbf529465cf2ca5d43546cd5875a59054c41e2fe97791419959282aa1" + digest = "1:dfc248d5e6e1582fdec83796d3d1d451aa6cae773c4e4ba1dac2838caef6d381" name = "github.com/btcsuite/websocket" packages = ["."] pruneopts = "" revision = "31079b6807923eb23992c421b114992b95131b55" [[projects]] - digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5" + digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" name = "github.com/davecgh/go-spew" packages = ["spew"] pruneopts = "" revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" -[[projects]] - digest = "1:4b5f8c148e7fa094b73bcb6d16ea46eac7fdc726e55b81845ff96e29df534421" - name = "github.com/garyburd/redigo" - packages = [ - "internal", - "redis", - ] - pruneopts = "" - revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e" - version = "v1.6.0" - [[projects]] digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a" name = "github.com/go-errors/errors" @@ -133,14 +122,14 @@ [[projects]] branch = "master" - digest = "1:4d5221853226d8d4be594d52d885ddde38170d2e3159b82ed92ecde4dded2304" + digest = "1:cd5bab9c9e23ffa6858eaa79dc827fd84bc24bc00b0cfb0b14036e393da2b1fa" name = "github.com/go-ini/ini" packages = ["."] pruneopts = "" revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e" [[projects]] - digest = "1:b1d3041d568e065ab4d76f7477844458e9209c0bb241eaccdc0770bf0a13b120" + digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b" name = "github.com/golang/protobuf" packages = ["proto"] pruneopts = "" @@ -148,7 +137,7 @@ version = "v1.1.0" [[projects]] - digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c" + digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6" name = "github.com/gorilla/websocket" packages = ["."] pruneopts = "" @@ -164,7 +153,7 @@ version = "v1.0" [[projects]] - digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52" + digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e" name = "github.com/jmespath/go-jmespath" packages = ["."] pruneopts = "" @@ -172,14 +161,14 @@ [[projects]] branch = "master" - digest = "1:375104fd705791c50351e652a9d80321813fefc4f83a7871cb2f6111a5bc1dc3" + digest = "1:d261f80387a38eeddc1d819ee9ee56d37ca10fc02e6e09ff400fb0ce146e13dc" name = "github.com/lbryio/lbryschema.go" packages = ["pb"] pruneopts = "" revision = "185433f2fd0c732547654749b98b37e56223dd22" [[projects]] - digest = "1:daad05ffdae6e2cd9bd9bbc14440e7e8e841037141f26a775a5a31b1b61cb14d" + digest = "1:5e30b8342813a6a85a647f9277e34ffcd5872dc57ab590dd9b251b145b6ec88f" name = "github.com/lbryio/ozzo-validation" packages = ["."] pruneopts = "" @@ -203,7 +192,7 @@ revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b" [[projects]] - digest = "1:ba7d1dfde0c2142011332bffce4d8468310228afd49dd4425ac59fa9124fb7c4" + digest = "1:3cb50c403fa46c85697dbc4e06a95008689e058f33466b7eb8d31ea0eb291ea3" name = "github.com/nlopes/slack" packages = ["."] pruneopts = "" @@ -212,7 +201,7 @@ [[projects]] branch = "master" - digest = "1:1ee326e6da62d87b3a07a9303d2cbb70f974207d14e6d992c61b6e650ff37c50" + digest = "1:8d6d81d0d9d8153e65d637bda77a7c4e6ba496c61efac3578d7d8c981ac31a7b" name = "github.com/rylio/ytdl" packages = ["."] pruneopts = "" @@ -228,7 +217,7 @@ [[projects]] branch = "master" - digest = "1:4cb1f758b69097d419a148e64c86b358a4f77c695504f99de1ee86617d64f74e" + digest = "1:c92f01303e3ab3b5da92657841639cb53d1548f0d2733d12ef3b9fd9d47c869e" name = "github.com/sirupsen/logrus" packages = ["."] pruneopts = "" @@ -244,7 +233,7 @@ [[projects]] branch = "master" - digest = "1:7bcdb212f21d3cf318699d50af69a9192ef73fedad0d94d9ed5616f349457881" + digest = "1:bfbf4a9c265ef41f8d03c9d91e340aaddae835710eaed6cd2e6be889cbc05f56" name = "github.com/spf13/cobra" packages = ["."] pruneopts = "" @@ -276,7 +265,7 @@ [[projects]] branch = "master" - digest = "1:e7dc596c84a908dd326d2c07453307f192976c3edb9482b88290c42076fe378f" + digest = "1:8af4dda167d0ef21ab0affc797bff87ed0e87c57bd1d9bf57ad8f72d348c7932" name = "golang.org/x/crypto" packages = [ "ripemd160", @@ -288,7 +277,7 @@ [[projects]] branch = "master" - digest = "1:3edb9c19d0b874999053badbbcc08edab3cde0262d2beb36ad6c0d78391c19ac" + digest = "1:5dc6753986b9eeba4abdf05dedc5ba06bb52dad43cc8aad35ffb42bb7adfa68f" name = "golang.org/x/net" packages = [ "context", @@ -301,7 +290,7 @@ [[projects]] branch = "master" - digest = "1:b1ac49fd3eae66e95230ea3423158b18374a5ad1c53caf89bc7fc1a441e9e0e7" + digest = "1:baee54aa41cb93366e76a9c29f8dd2e4c4e6a35ff89551721d5275d2c858edc9" name = "golang.org/x/sys" packages = [ "unix", @@ -312,7 +301,7 @@ [[projects]] branch = "master" - digest = "1:b4f82373e582dff1470e66574ac664b548aa69cffce0943321797fd75f46ee6f" + digest = "1:b064108d68f82d0201d9f812297c928e57488e82ccdb77ed06ac69f64519a890" name = "google.golang.org/api" packages = [ "gensupport", @@ -325,7 +314,7 @@ revision = "ef86ce4234efee96020bde00391d6a9cfae66561" [[projects]] - digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575" + digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d" name = "gopkg.in/nullbio/null.v6" packages = ["convert"] pruneopts = "" @@ -348,7 +337,6 @@ "github.com/btcsuite/btcutil", "github.com/btcsuite/btcutil/base58", "github.com/davecgh/go-spew/spew", - "github.com/garyburd/redigo/redis", "github.com/go-errors/errors", "github.com/go-ini/ini", "github.com/lbryio/lbryschema.go/pb", diff --git a/Gopkg.toml b/Gopkg.toml index 1eeae38..d1e4711 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -2,10 +2,6 @@ name = "github.com/davecgh/go-spew" version = "1.1.0" -[[constraint]] - name = "github.com/garyburd/redigo" - version = "1.1.0" - [[constraint]] name = "github.com/go-errors/errors" version = "1.0.0" diff --git a/cmd/count.go b/cmd/count.go index cfbfa65..77f6952 100644 --- a/cmd/count.go +++ b/cmd/count.go @@ -2,6 +2,7 @@ package cmd import ( sync "github.com/lbryio/lbry.go/ytsync" + "github.com/lbryio/lbry.go/ytsync/sdk" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -22,7 +23,9 @@ func ytcount(cmd *cobra.Command, args []string) { channelID := args[1] s := sync.Sync{ - YoutubeAPIKey: ytAPIKey, + APIConfig: &sdk.APIConfig{ + YoutubeAPIKey: ytAPIKey, + }, YoutubeChannelID: channelID, } diff --git a/cmd/ytsync.go b/cmd/ytsync.go index 89c1bcd..affd9fe 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -9,6 +9,7 @@ import ( "github.com/lbryio/lbry.go/util" sync "github.com/lbryio/lbry.go/ytsync" + "github.com/lbryio/lbry.go/ytsync/sdk" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -141,35 +142,40 @@ func ytSync(cmd *cobra.Command, args []string) { blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/" } - sm := sync.SyncManager{ - StopOnError: stopOnError, - MaxTries: maxTries, - TakeOverExistingChannel: takeOverExistingChannel, - Refill: refill, - Limit: limit, - SkipSpaceCheck: skipSpaceCheck, - SyncUpdate: syncUpdate, - SyncStatus: syncStatus, - SyncFrom: syncFrom, - SyncUntil: syncUntil, - ConcurrentJobs: concurrentJobs, - ConcurrentVideos: concurrentJobs, - HostName: hostname, - YoutubeChannelID: channelID, - YoutubeAPIKey: youtubeAPIKey, - ApiURL: apiURL, - ApiToken: apiToken, - BlobsDir: blobsDir, - VideosLimit: videosLimit, - MaxVideoSize: maxVideoSize, - LbrycrdString: lbrycrdString, - AwsS3ID: awsS3ID, - AwsS3Secret: awsS3Secret, - AwsS3Region: awsS3Region, - AwsS3Bucket: awsS3Bucket, - SingleRun: singleRun, + syncProperties := &sdk.SyncProperties{ + SyncFrom: syncFrom, + SyncUntil: syncUntil, + YoutubeChannelID: channelID, } - + apiConfig := &sdk.APIConfig{ + YoutubeAPIKey: youtubeAPIKey, + ApiURL: apiURL, + ApiToken: apiToken, + HostName: hostname, + } + sm := sync.NewSyncManager( + stopOnError, + maxTries, + takeOverExistingChannel, + refill, + limit, + skipSpaceCheck, + syncUpdate, + concurrentJobs, + concurrentJobs, + blobsDir, + videosLimit, + maxVideoSize, + lbrycrdString, + awsS3ID, + awsS3Secret, + awsS3Region, + awsS3Bucket, + syncStatus, + singleRun, + syncProperties, + apiConfig, + ) err := sm.Start() if err != nil { sync.SendErrorToSlack(err.Error()) diff --git a/ytsync/count.go b/ytsync/count.go index 7900921..dbba141 100644 --- a/ytsync/count.go +++ b/ytsync/count.go @@ -11,7 +11,7 @@ import ( func (s *Sync) CountVideos() (uint64, error) { client := &http.Client{ - Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, + Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey}, } service, err := youtube.New(client) diff --git a/ytsync/manager.go b/ytsync/manager.go index 8d45e25..de9e67f 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -1,49 +1,71 @@ package ytsync import ( - "encoding/json" "fmt" - "io/ioutil" - "net/http" - "net/url" - "strconv" "strings" "syscall" "time" "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/null" "github.com/lbryio/lbry.go/util" + "github.com/lbryio/lbry.go/ytsync/namer" + "github.com/lbryio/lbry.go/ytsync/sdk" log "github.com/sirupsen/logrus" ) type SyncManager struct { - StopOnError bool - MaxTries int - TakeOverExistingChannel bool - Refill int - Limit int - SkipSpaceCheck bool - SyncUpdate bool - SyncStatus string - SyncFrom int64 - SyncUntil int64 - ConcurrentJobs int - ConcurrentVideos int - HostName string - YoutubeChannelID string - YoutubeAPIKey string - ApiURL string - ApiToken string - BlobsDir string - VideosLimit int - MaxVideoSize int - LbrycrdString string - AwsS3ID string - AwsS3Secret string - AwsS3Region string - AwsS3Bucket string - SingleRun bool + stopOnError bool + maxTries int + takeOverExistingChannel bool + refill int + limit int + skipSpaceCheck bool + syncUpdate bool + concurrentJobs int + concurrentVideos int + blobsDir string + videosLimit int + maxVideoSize int + lbrycrdString string + awsS3ID string + awsS3Secret string + awsS3Region string + syncStatus string + awsS3Bucket string + singleRun bool + syncProperties *sdk.SyncProperties + apiConfig *sdk.APIConfig + namer *namer.Namer +} + +func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int, + skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int, + maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string, + syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager { + return &SyncManager{ + stopOnError: stopOnError, + maxTries: maxTries, + takeOverExistingChannel: takeOverExistingChannel, + refill: refill, + limit: limit, + skipSpaceCheck: skipSpaceCheck, + syncUpdate: syncUpdate, + concurrentJobs: concurrentJobs, + concurrentVideos: concurrentVideos, + blobsDir: blobsDir, + videosLimit: videosLimit, + maxVideoSize: maxVideoSize, + lbrycrdString: lbrycrdString, + awsS3ID: awsS3ID, + awsS3Secret: awsS3Secret, + awsS3Region: awsS3Region, + awsS3Bucket: awsS3Bucket, + syncStatus: syncStatus, + singleRun: singleRun, + syncProperties: syncProperties, + apiConfig: apiConfig, + namer: namer.NewNamer(), + } } const ( @@ -57,143 +79,13 @@ const ( var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized} -type apiJobsResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data []apiYoutubeChannel `json:"data"` -} - -type apiYoutubeChannel struct { - ChannelId string `json:"channel_id"` - TotalVideos uint `json:"total_videos"` - DesiredChannelName string `json:"desired_channel_name"` - SyncServer null.String `json:"sync_server"` -} - -func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { - endpoint := s.ApiURL + "/yt/jobs" - res, _ := http.PostForm(endpoint, url.Values{ - "auth_token": {s.ApiToken}, - "sync_status": {status}, - "min_videos": {strconv.Itoa(1)}, - "after": {strconv.Itoa(int(s.SyncFrom))}, - "before": {strconv.Itoa(int(s.SyncUntil))}, - "sync_server": {s.HostName}, - "channel_id": {s.YoutubeChannelID}, - }) - defer res.Body.Close() - body, _ := ioutil.ReadAll(res.Body) - var response apiJobsResponse - err := json.Unmarshal(body, &response) - if err != nil { - return nil, err - } - if response.Data == nil { - return nil, errors.Err(response.Error) - } - log.Printf("Fetched channels: %d", len(response.Data)) - return response.Data, nil -} - -type apiChannelStatusResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data []syncedVideo `json:"data"` -} - -type syncedVideo struct { - VideoID string `json:"video_id"` - Published bool `json:"published"` - FailureReason string `json:"failure_reason"` - ClaimName string `json:"claim_name"` -} - -func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) { - endpoint := s.ApiURL + "/yt/channel_status" - if len(failureReason) > maxReasonLength { - failureReason = failureReason[:maxReasonLength] - } - res, _ := http.PostForm(endpoint, url.Values{ - "channel_id": {channelID}, - "sync_server": {s.HostName}, - "auth_token": {s.ApiToken}, - "sync_status": {status}, - "failure_reason": {failureReason}, - }) - defer res.Body.Close() - body, _ := ioutil.ReadAll(res.Body) - var response apiChannelStatusResponse - err := json.Unmarshal(body, &response) - if err != nil { - return nil, nil, err - } - if !response.Error.IsNull() { - return nil, nil, errors.Err(response.Error.String) - } - if response.Data != nil { - svs := make(map[string]syncedVideo) - claimNames := make(map[string]bool) - for _, v := range response.Data { - svs[v.VideoID] = v - claimNames[v.ClaimName] = v.Published - } - return svs, claimNames, nil - } - return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) -} - const ( VideoStatusPublished = "published" VideoStatusFailed = "failed" ) -func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { - endpoint := s.ApiURL + "/yt/video_status" - if len(failureReason) > maxReasonLength { - failureReason = failureReason[:maxReasonLength] - } - vals := url.Values{ - "youtube_channel_id": {channelID}, - "video_id": {videoID}, - "status": {status}, - "auth_token": {s.ApiToken}, - } - if status == VideoStatusPublished { - if claimID == "" || claimName == "" { - return errors.Err("claimID or claimName missing") - } - vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10)) - vals.Add("claim_id", claimID) - vals.Add("claim_name", claimName) - if size != nil { - vals.Add("size", strconv.FormatInt(*size, 10)) - } - } - if failureReason != "" { - vals.Add("failure_reason", failureReason) - } - res, _ := http.PostForm(endpoint, vals) - defer res.Body.Close() - body, _ := ioutil.ReadAll(res.Body) - var response struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data null.String `json:"data"` - } - err := json.Unmarshal(body, &response) - if err != nil { - return err - } - if !response.Error.IsNull() { - return errors.Err(response.Error.String) - } - if !response.Data.IsNull() && response.Data.String == "ok" { - return nil - } - return errors.Err("invalid API response. Status code: %d", res.StatusCode) -} - func (s *SyncManager) Start() error { + syncCount := 0 for { err := s.checkUsedSpace() @@ -204,9 +96,9 @@ func (s *SyncManager) Start() error { var syncs []Sync shouldInterruptLoop := false - isSingleChannelSync := s.YoutubeChannelID != "" + isSingleChannelSync := s.syncProperties.YoutubeChannelID != "" if isSingleChannelSync { - channels, err := s.fetchChannels("") + channels, err := s.apiConfig.FetchChannels("", s.syncProperties) if err != nil { return err } @@ -216,52 +108,53 @@ func (s *SyncManager) Start() error { lbryChannelName := channels[0].DesiredChannelName syncs = make([]Sync, 1) syncs[0] = Sync{ - YoutubeAPIKey: s.YoutubeAPIKey, - YoutubeChannelID: s.YoutubeChannelID, + APIConfig: s.apiConfig, + YoutubeChannelID: s.syncProperties.YoutubeChannelID, LbryChannelName: lbryChannelName, - StopOnError: s.StopOnError, - MaxTries: s.MaxTries, - ConcurrentVideos: s.ConcurrentVideos, - TakeOverExistingChannel: s.TakeOverExistingChannel, - Refill: s.Refill, + StopOnError: s.stopOnError, + MaxTries: s.maxTries, + ConcurrentVideos: s.concurrentVideos, + TakeOverExistingChannel: s.takeOverExistingChannel, + Refill: s.refill, Manager: s, - LbrycrdString: s.LbrycrdString, - AwsS3ID: s.AwsS3ID, - AwsS3Secret: s.AwsS3Secret, - AwsS3Region: s.AwsS3Region, - AwsS3Bucket: s.AwsS3Bucket, + LbrycrdString: s.lbrycrdString, + AwsS3ID: s.awsS3ID, + AwsS3Secret: s.awsS3Secret, + AwsS3Region: s.awsS3Region, + AwsS3Bucket: s.awsS3Bucket, + namer: s.namer, } shouldInterruptLoop = true } else { var queuesToSync []string - if s.SyncStatus != "" { - queuesToSync = append(queuesToSync, s.SyncStatus) - } else if s.SyncUpdate { + if s.syncStatus != "" { + queuesToSync = append(queuesToSync, s.syncStatus) + } else if s.syncUpdate { queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) } else { queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) } for _, q := range queuesToSync { - channels, err := s.fetchChannels(q) + channels, err := s.apiConfig.FetchChannels(q, s.syncProperties) if err != nil { return err } for _, c := range channels { syncs = append(syncs, Sync{ - YoutubeAPIKey: s.YoutubeAPIKey, + APIConfig: s.apiConfig, YoutubeChannelID: c.ChannelId, LbryChannelName: c.DesiredChannelName, - StopOnError: s.StopOnError, - MaxTries: s.MaxTries, - ConcurrentVideos: s.ConcurrentVideos, - TakeOverExistingChannel: s.TakeOverExistingChannel, - Refill: s.Refill, + StopOnError: s.stopOnError, + MaxTries: s.maxTries, + ConcurrentVideos: s.concurrentVideos, + TakeOverExistingChannel: s.takeOverExistingChannel, + Refill: s.refill, Manager: s, - LbrycrdString: s.LbrycrdString, - AwsS3ID: s.AwsS3ID, - AwsS3Secret: s.AwsS3Secret, - AwsS3Region: s.AwsS3Region, - AwsS3Bucket: s.AwsS3Bucket, + LbrycrdString: s.lbrycrdString, + AwsS3ID: s.awsS3ID, + AwsS3Secret: s.awsS3Secret, + AwsS3Region: s.awsS3Region, + AwsS3Bucket: s.awsS3Bucket, }) } } @@ -294,12 +187,12 @@ func (s *SyncManager) Start() error { if !shouldNotCount { syncCount++ } - if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { + if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) { shouldInterruptLoop = true break } } - if shouldInterruptLoop || s.SingleRun { + if shouldInterruptLoop || s.singleRun { break } } @@ -307,11 +200,11 @@ func (s *SyncManager) Start() error { } func (s *SyncManager) checkUsedSpace() error { - usedPctile, err := GetUsedSpace(s.BlobsDir) + usedPctile, err := GetUsedSpace(s.blobsDir) if err != nil { return err } - if usedPctile >= 0.90 && !s.SkipSpaceCheck { + if usedPctile >= 0.90 && !s.skipSpaceCheck { return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) } log.Infof("disk usage: %.1f%%", usedPctile*100) diff --git a/ytsync/namer/names.go b/ytsync/namer/names.go new file mode 100644 index 0000000..531bb67 --- /dev/null +++ b/ytsync/namer/names.go @@ -0,0 +1,83 @@ +package namer + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "regexp" + "strconv" + "strings" + "sync" +) + +var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`) + +type Namer struct { + mu *sync.Mutex + names map[string]bool +} + +func NewNamer() *Namer { + return &Namer{ + mu: &sync.Mutex{}, + names: make(map[string]bool), + } +} + +func (n *Namer) SetNames(names map[string]bool) { + n.names = names +} + +func (n *Namer) GetNextName(prefix string) string { + n.mu.Lock() + defer n.mu.Unlock() + + attempt := 1 + var name string + for { + name = getClaimNameFromTitle(prefix, attempt) + if _, exists := n.names[name]; !exists { + break + } + attempt++ + } + + //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash + if len(name) < 2 { + sum := md5.Sum([]byte(prefix)) + name = fmt.Sprintf("%s-%d", hex.EncodeToString(sum[:])[:15], attempt) + } + + n.names[name] = true + + return name +} + +// TODO: clean this up some +func getClaimNameFromTitle(title string, attempt int) string { + suffix := "" + if attempt > 1 { + suffix = "-" + strconv.Itoa(attempt) + } + maxLen := 40 - len(suffix) + + chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-") + + name := chunks[0] + if len(name) > maxLen { + return name[:maxLen] + } + + for _, chunk := range chunks[1:] { + tmpName := name + "-" + chunk + if len(tmpName) > maxLen { + if len(name) < 20 { + name = tmpName[:maxLen] + } + break + } + name = tmpName + } + + return name + suffix +} diff --git a/ytsync/sdk/api.go b/ytsync/sdk/api.go new file mode 100644 index 0000000..1223b0f --- /dev/null +++ b/ytsync/sdk/api.go @@ -0,0 +1,170 @@ +package sdk + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/lbry.go/null" +) + +const ( + MaxReasonLength = 500 +) + +type APIConfig struct { + YoutubeAPIKey string + ApiURL string + ApiToken string + HostName string +} + +type SyncProperties struct { + SyncFrom int64 + SyncUntil int64 + YoutubeChannelID string +} + +type YoutubeChannel struct { + ChannelId string `json:"channel_id"` + TotalVideos uint `json:"total_videos"` + DesiredChannelName string `json:"desired_channel_name"` + SyncServer null.String `json:"sync_server"` + Fee *struct { + Amount string `json:"amount"` + Address string `json:"address"` + Currency string `json:"currency"` + } `json:"fee"` +} + +func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) { + type apiJobsResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data []YoutubeChannel `json:"data"` + } + endpoint := a.ApiURL + "/yt/jobs" + res, _ := http.PostForm(endpoint, url.Values{ + "auth_token": {a.ApiToken}, + "sync_status": {status}, + "min_videos": {strconv.Itoa(1)}, + "after": {strconv.Itoa(int(cp.SyncFrom))}, + "before": {strconv.Itoa(int(cp.SyncUntil))}, + "sync_server": {a.HostName}, + "channel_id": {cp.YoutubeChannelID}, + }) + defer res.Body.Close() + body, _ := ioutil.ReadAll(res.Body) + var response apiJobsResponse + err := json.Unmarshal(body, &response) + if err != nil { + return nil, err + } + if response.Data == nil { + return nil, errors.Err(response.Error) + } + log.Printf("Fetched channels: %d", len(response.Data)) + return response.Data, nil +} + +type SyncedVideo struct { + VideoID string `json:"video_id"` + Published bool `json:"published"` + FailureReason string `json:"failure_reason"` + ClaimName string `json:"claim_name"` +} + +func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) { + type apiChannelStatusResponse struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data []SyncedVideo `json:"data"` + } + endpoint := a.ApiURL + "/yt/channel_status" + if len(failureReason) > MaxReasonLength { + failureReason = failureReason[:MaxReasonLength] + } + res, _ := http.PostForm(endpoint, url.Values{ + "channel_id": {channelID}, + "sync_server": {a.HostName}, + "auth_token": {a.ApiToken}, + "sync_status": {status}, + "failure_reason": {failureReason}, + }) + defer res.Body.Close() + body, _ := ioutil.ReadAll(res.Body) + var response apiChannelStatusResponse + err := json.Unmarshal(body, &response) + if err != nil { + return nil, nil, err + } + if !response.Error.IsNull() { + return nil, nil, errors.Err(response.Error.String) + } + if response.Data != nil { + svs := make(map[string]SyncedVideo) + claimNames := make(map[string]bool) + for _, v := range response.Data { + svs[v.VideoID] = v + claimNames[v.ClaimName] = v.Published + } + return svs, claimNames, nil + } + return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) +} + +const ( + VideoStatusPublished = "published" + VideoStatusFailed = "failed" +) + +func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { + endpoint := a.ApiURL + "/yt/video_status" + if len(failureReason) > MaxReasonLength { + failureReason = failureReason[:MaxReasonLength] + } + vals := url.Values{ + "youtube_channel_id": {channelID}, + "video_id": {videoID}, + "status": {status}, + "auth_token": {a.ApiToken}, + } + if status == VideoStatusPublished { + if claimID == "" || claimName == "" { + return errors.Err("claimID or claimName missing") + } + vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10)) + vals.Add("claim_id", claimID) + vals.Add("claim_name", claimName) + if size != nil { + vals.Add("size", strconv.FormatInt(*size, 10)) + } + } + if failureReason != "" { + vals.Add("failure_reason", failureReason) + } + res, _ := http.PostForm(endpoint, vals) + defer res.Body.Close() + body, _ := ioutil.ReadAll(res.Body) + var response struct { + Success bool `json:"success"` + Error null.String `json:"error"` + Data null.String `json:"data"` + } + err := json.Unmarshal(body, &response) + if err != nil { + return err + } + if !response.Error.IsNull() { + return errors.Err(response.Error.String) + } + if !response.Data.IsNull() && response.Data.String == "ok" { + return nil + } + return errors.Err("invalid API response. Status code: %d", res.StatusCode) +} diff --git a/ytsync/setup.go b/ytsync/setup.go index 113bbb5..05d2c18 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error { s.syncedVideosMux.RUnlock() log.Debugf("We already allocated credits for %d videos", numPublished) - if numOnSource-numPublished > s.Manager.VideosLimit { - numOnSource = s.Manager.VideosLimit + if numOnSource-numPublished > s.Manager.videosLimit { + numOnSource = s.Manager.videosLimit } minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount diff --git a/ytsync/sources/shared.go b/ytsync/sources/shared.go index ba5be9e..96d5121 100644 --- a/ytsync/sources/shared.go +++ b/ytsync/sources/shared.go @@ -1,90 +1,27 @@ package sources import ( - "fmt" - "regexp" - "strconv" "strings" - "sync" - - "crypto/md5" - "encoding/hex" "github.com/lbryio/lbry.go/jsonrpc" - log "github.com/sirupsen/logrus" + "github.com/lbryio/lbry.go/ytsync/namer" ) -var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`) - type SyncSummary struct { ClaimID string ClaimName string } -func getClaimNameFromTitle(title string, attempt int) string { - suffix := "" - if attempt > 1 { - suffix = "-" + strconv.Itoa(attempt) - } - maxLen := 40 - len(suffix) - - chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-") - - name := chunks[0] - if len(name) > maxLen { - return name[:maxLen] - } - - for _, chunk := range chunks[1:] { - tmpName := name + "-" + chunk - if len(tmpName) > maxLen { - if len(name) < 20 { - name = tmpName[:maxLen] - } - break - } - name = tmpName - } - - return name + suffix -} - -func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { - attempt := 0 +func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *namer.Namer) (*SyncSummary, error) { for { - attempt++ - name := getClaimNameFromTitle(title, attempt) - - syncedVideosMux.Lock() - _, exists := claimNames[name] - if exists { - log.Printf("name exists, retrying (%d attempts so far)", attempt) - syncedVideosMux.Unlock() - continue - } - claimNames[name] = false - syncedVideosMux.Unlock() - - //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash - if len(name) < 2 { - hasher := md5.New() - hasher.Write([]byte(title)) - name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt) - } - + name := namer.GetNextName(title) response, err := daemon.Publish(name, filename, amount, options) - if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { - syncedVideosMux.Lock() - claimNames[name] = true - syncedVideosMux.Unlock() - if err == nil { - return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil - } else { - log.Printf("name exists, retrying (%d attempts so far)", attempt) + if err != nil { + if strings.Contains(err.Error(), "failed: Multiple claims (") { continue } - } else { return nil, err } + return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil } } diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go index 3819033..57fdc33 100644 --- a/ytsync/sources/ucbVideo.go +++ b/ytsync/sources/ucbVideo.go @@ -6,15 +6,15 @@ import ( "regexp" "strconv" "strings" - "time" - "sync" + "time" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/ytsync/namer" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -174,7 +174,7 @@ func (v *ucbVideo) saveThumbnail() error { return err } -func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) { options := jsonrpc.PublishOptions{ Title: &v.title, Author: strPtr("UC Berkeley"), @@ -187,16 +187,14 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f ChangeAddress: &claimAddress, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer) } func (v *ucbVideo) Size() *int64 { return nil } -func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { - v.claimNames = claimNames - v.syncedVideosMux = syncedVideosMux +func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -210,7 +208,7 @@ func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount floa //} //log.Debugln("Created thumbnail for " + v.id) - summary, err := v.publish(daemon, claimAddress, amount, channelID) + summary, err := v.publish(daemon, claimAddress, amount, channelID, namer) if err != nil { return nil, errors.Prefix("publish error", err) } diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index d67587d..925dfc3 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -9,12 +9,12 @@ import ( "regexp" "strconv" "strings" - "time" - "sync" + "time" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/ytsync/namer" "github.com/rylio/ytdl" log "github.com/sirupsen/logrus" @@ -155,6 +155,8 @@ func (v *YoutubeVideo) download() error { err = videoInfo.Download(formats[formatIndex], downloadedFile) downloadedFile.Close() if err != nil { + //delete the video and ignore the error + _ = v.delete() break } fi, err := os.Stat(v.getFilename()) @@ -172,7 +174,6 @@ func (v *YoutubeVideo) download() error { break } } - return err } @@ -234,7 +235,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() error { func strPtr(s string) *string { return &s } -func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) { if channelID == "" { return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? } @@ -249,18 +250,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou ChangeAddress: &claimAddress, ChannelID: &channelID, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) + + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer) } func (v *YoutubeVideo) Size() *int64 { return v.size } -func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { - v.claimNames = claimNames - v.syncedVideosMux = syncedVideosMux +func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) { v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024 - //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -274,14 +273,11 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount } log.Debugln("Created thumbnail for " + v.id) - summary, err := v.publish(daemon, claimAddress, amount, channelID) + summary, err := v.publish(daemon, claimAddress, amount, channelID, namer) //delete the video in all cases (and ignore the error) _ = v.delete() - if err != nil { - return nil, errors.Prefix("publish error", err) - } - return summary, nil + return summary, errors.Prefix("publish error", err) } // sorting videos diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 59c3864..784b06b 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -17,17 +17,20 @@ import ( "syscall" "time" + "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/stop" + "github.com/lbryio/lbry.go/util" + "github.com/lbryio/lbry.go/ytsync/namer" + "github.com/lbryio/lbry.go/ytsync/sdk" + "github.com/lbryio/lbry.go/ytsync/sources" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/stop" - "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" "google.golang.org/api/googleapi/transport" @@ -46,7 +49,7 @@ type video interface { IDAndNum() string PlaylistPosition() int PublishedAt() time.Time - Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error) + Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer) (*sources.SyncSummary, error) } // sorting videos @@ -58,7 +61,7 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[ // Sync stores the options that control how syncing happens type Sync struct { - YoutubeAPIKey string + APIConfig *sdk.APIConfig YoutubeChannelID string LbryChannelName string StopOnError bool @@ -77,10 +80,10 @@ type Sync struct { claimAddress string videoDirectory string syncedVideosMux *sync.RWMutex - syncedVideos map[string]syncedVideo - claimNames map[string]bool + syncedVideos map[string]sdk.SyncedVideo grp *stop.Group lbryChannelID string + namer *namer.Namer walletMux *sync.Mutex queue chan video @@ -89,14 +92,11 @@ type Sync struct { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { s.syncedVideosMux.Lock() defer s.syncedVideosMux.Unlock() - s.syncedVideos[videoID] = syncedVideo{ + s.syncedVideos[videoID] = sdk.SyncedVideo{ VideoID: videoID, Published: published, FailureReason: failureReason, } - if claimName != "" { - s.claimNames[claimName] = true - } } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -221,13 +221,13 @@ func (s *Sync) uploadWallet() error { } func (s *Sync) setStatusSyncing() error { - syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") + syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "") if err != nil { return err } s.syncedVideosMux.Lock() s.syncedVideos = syncedVideos - s.claimNames = claimNames + s.Manager.namer.SetNames(claimNames) s.syncedVideosMux.Unlock() return nil } @@ -260,7 +260,7 @@ func (s *Sync) FullCycle() (e error) { err = s.downloadWallet() if err != nil && err.Error() != "wallet not on S3" { - return errors.Prefix("failure in downloading wallet: ", err) + return errors.Prefix("failure in downloading wallet", err) } else if err == nil { log.Println("Continuing previous upload") } else { @@ -311,14 +311,13 @@ func (s *Sync) setChannelTerminationStatus(e *error) { return } failureReason := (*e).Error() - _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) + _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) if err != nil { - msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) - err = errors.Prefix(msg, err) - *e = errors.Prefix(err.Error(), *e) + msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName) + *e = errors.Prefix(msg+err.Error(), *e) } } else if !s.IsInterrupted() { - _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") + _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "") if err != nil { *e = err } @@ -359,7 +358,7 @@ func (s *Sync) stopAndUploadWallet(e *error) { e = &err //not 100% sure return } else { - *e = errors.Prefix("failure uploading wallet: ", *e) + *e = errors.Prefix("failure uploading wallet", *e) } } } @@ -424,7 +423,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err pv, ok := s.syncedVideos[videoID] if !ok || pv.ClaimName != c.Name { fixed++ - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) + err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) if err != nil { return total, fixed, err } @@ -438,11 +437,11 @@ func (s *Sync) doSync() error { var err error claims, err := s.daemon.ClaimListMine() if err != nil { - return errors.Prefix("cannot list claims: ", err) + return errors.Prefix("cannot list claims", err) } hasDupes, err := s.fixDupes(*claims) if err != nil { - return errors.Prefix("error checking for duplicates: ", err) + return errors.Prefix("error checking for duplicates", err) } if hasDupes { SendInfoToSlack("Channel had dupes and was fixed!") @@ -452,13 +451,13 @@ func (s *Sync) doSync() error { } claims, err = s.daemon.ClaimListMine() if err != nil { - return errors.Prefix("cannot list claims: ", err) + return errors.Prefix("cannot list claims", err) } } pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims) if err != nil { - return errors.Prefix("error counting claims: ", err) + return errors.Prefix("error counting claims", err) } if nFixed > 0 { err := s.setStatusSyncing() @@ -466,7 +465,6 @@ func (s *Sync) doSync() error { return err } SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed) - } pubsOnDB := 0 for _, sv := range s.syncedVideos { @@ -593,7 +591,7 @@ func (s *Sync) startWorker(workerNum int) { SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } s.AppendSyncedVideo(v.ID(), false, err.Error(), "") - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) + err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } @@ -605,7 +603,7 @@ func (s *Sync) startWorker(workerNum int) { func (s *Sync) enqueueYoutubeVideos() error { client := &http.Client{ - Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, + Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey}, } service, err := youtube.New(client) @@ -770,7 +768,7 @@ func (s *Sync) processVideo(v video) (err error) { return nil } - if v.PlaylistPosition() > s.Manager.VideosLimit { + if v.PlaylistPosition() > s.Manager.videosLimit { log.Println(v.ID() + " is old: skipping") return nil } @@ -778,11 +776,13 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { return err } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux) + + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer) if err != nil { return err } - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) + + err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) }