From 559d80cf3f0f8a7468580258921703cde8855d4a Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 14 Aug 2018 11:09:23 -0400 Subject: [PATCH 1/8] add size support --- ytsync/manager.go | 5 ++++- ytsync/sources/ucbVideo.go | 4 ++++ ytsync/sources/youtubeVideo.go | 7 +++++++ ytsync/ytsync.go | 8 +++++--- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 8aa8b1a..8eebfbf 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -141,7 +141,7 @@ const ( VideoStatusFailed = "failed" ) -func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { +func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { endpoint := s.ApiURL + "/yt/video_status" vals := url.Values{ @@ -157,6 +157,9 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10)) vals.Add("claim_id", claimID) vals.Add("claim_name", claimName) + if size != nil { + vals.Add("size", strconv.FormatInt(*size, 10)) + } } if failureReason != "" { maxReasonLength := 500 diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go index f7ead6a..2e17bea 100644 --- a/ytsync/sources/ucbVideo.go +++ b/ytsync/sources/ucbVideo.go @@ -186,6 +186,10 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } +func (v ucbVideo) Size() *int64 { + return nil +} + func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index 9fb091e..ffecc3f 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -25,6 +25,7 @@ type YoutubeVideo struct { title string description string playlistPosition int64 + size *int64 publishedAt time.Time dir string } @@ -203,6 +204,10 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } +func (v YoutubeVideo) Size() *int64 { + return v.size +} + func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() @@ -215,6 +220,8 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f if err != nil { return nil, err } + *v.size = fi.Size() + if fi.Size() > int64(maxVideoSize)*1024*1024 { //delete the video and ignore the error _ = v.delete() diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 2df409d..dfddac3 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -41,6 +41,7 @@ const ( ) type video interface { + Size() *int64 ID() string IDAndNum() string PlaylistPosition() int @@ -470,7 +471,7 @@ func (s *Sync) startWorker(workerNum int) { SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } s.AppendSyncedVideo(v.ID(), false, err.Error()) - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error()) + err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } @@ -671,10 +672,11 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { return err } - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") + err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) if err != nil { - return err + SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } + s.AppendSyncedVideo(v.ID(), true, "") return nil -- 2.45.2 From 697a6ad6bb307af71556d70f22eff467ed52b539 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 17 Aug 2018 10:05:54 -0400 Subject: [PATCH 2/8] user pointers! bug fixes now it works... --- ytsync/manager.go | 37 +++++++++++++++++----------------- ytsync/sources/ucbVideo.go | 28 ++++++++++++------------- ytsync/sources/youtubeVideo.go | 35 ++++++++++++++++---------------- ytsync/ytsync.go | 10 +++++---- 4 files changed, 57 insertions(+), 53 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 8eebfbf..3eed3b5 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -70,7 +70,7 @@ type apiYoutubeChannel struct { SyncServer null.String `json:"sync_server"` } -func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { +func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { endpoint := s.ApiURL + "/yt/jobs" res, _ := http.PostForm(endpoint, url.Values{ "auth_token": {s.ApiToken}, @@ -107,14 +107,17 @@ type syncedVideo struct { FailureReason string `json:"failure_reason"` } -func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) { +func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, error) { endpoint := s.ApiURL + "/yt/channel_status" - + if len(failureReason) > maxReasonLength { + failureReason = failureReason[:maxReasonLength] + } res, _ := http.PostForm(endpoint, url.Values{ - "channel_id": {channelID}, - "sync_server": {s.HostName}, - "auth_token": {s.ApiToken}, - "sync_status": {status}, + "channel_id": {channelID}, + "sync_server": {s.HostName}, + "auth_token": {s.ApiToken}, + "sync_status": {status}, + "failure_reason": {failureReason}, }) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) @@ -141,9 +144,11 @@ const ( VideoStatusFailed = "failed" ) -func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { +func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { endpoint := s.ApiURL + "/yt/video_status" - + if len(failureReason) > maxReasonLength { + failureReason = failureReason[:maxReasonLength] + } vals := url.Values{ "youtube_channel_id": {channelID}, "video_id": {videoID}, @@ -162,10 +167,6 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st } } if failureReason != "" { - maxReasonLength := 500 - if len(failureReason) > maxReasonLength { - failureReason = failureReason[:500] - } vals.Add("failure_reason", failureReason) } res, _ := http.PostForm(endpoint, vals) @@ -189,7 +190,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st return errors.Err("invalid API response. Status code: %d", res.StatusCode) } -func (s SyncManager) Start() error { +func (s *SyncManager) Start() error { syncCount := 0 for { err := s.checkUsedSpace() @@ -223,7 +224,7 @@ func (s SyncManager) Start() error { ConcurrentVideos: s.ConcurrentVideos, TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, - Manager: &s, + Manager: s, LbrycrdString: s.LbrycrdString, AwsS3ID: s.AwsS3ID, AwsS3Secret: s.AwsS3Secret, @@ -258,7 +259,7 @@ func (s SyncManager) Start() error { ConcurrentVideos: s.ConcurrentVideos, TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, - Manager: &s, + Manager: s, LbrycrdString: s.LbrycrdString, AwsS3ID: s.AwsS3ID, AwsS3Secret: s.AwsS3Secret, @@ -308,11 +309,11 @@ func (s SyncManager) Start() error { return nil } -func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool { +func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool { return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName) } -func (s SyncManager) checkUsedSpace() error { +func (s *SyncManager) checkUsedSpace() error { usedPctile, err := GetUsedSpace(s.BlobsDir) if err != nil { return err diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go index 2e17bea..0073007 100644 --- a/ytsync/sources/ucbVideo.go +++ b/ytsync/sources/ucbVideo.go @@ -28,9 +28,9 @@ type ucbVideo struct { dir string } -func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo { +func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo { p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors - return ucbVideo{ + return &ucbVideo{ id: id, title: title, description: description, @@ -40,19 +40,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi } } -func (v ucbVideo) ID() string { +func (v *ucbVideo) ID() string { return v.id } -func (v ucbVideo) PlaylistPosition() int { +func (v *ucbVideo) PlaylistPosition() int { return 0 } -func (v ucbVideo) IDAndNum() string { +func (v *ucbVideo) IDAndNum() string { return v.ID() + " (?)" } -func (v ucbVideo) PublishedAt() time.Time { +func (v *ucbVideo) PublishedAt() time.Time { return v.publishedAt //r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`) //matches := r.FindStringSubmatch(v.title) @@ -65,11 +65,11 @@ func (v ucbVideo) PublishedAt() time.Time { //return time.Now() } -func (v ucbVideo) getFilename() string { +func (v *ucbVideo) getFilename() string { return v.dir + "/" + v.id + ".mp4" } -func (v ucbVideo) getClaimName(attempt int) string { +func (v *ucbVideo) getClaimName(attempt int) string { reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) suffix := "" if attempt > 1 { @@ -98,7 +98,7 @@ func (v ucbVideo) getClaimName(attempt int) string { return name + suffix } -func (v ucbVideo) getAbbrevDescription() string { +func (v *ucbVideo) getAbbrevDescription() string { maxLines := 10 description := strings.TrimSpace(v.description) if strings.Count(description, "\n") < maxLines { @@ -107,7 +107,7 @@ func (v ucbVideo) getAbbrevDescription() string { return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." } -func (v ucbVideo) download() error { +func (v *ucbVideo) download() error { videoPath := v.getFilename() _, err := os.Stat(videoPath) @@ -146,7 +146,7 @@ func (v ucbVideo) download() error { return nil } -func (v ucbVideo) saveThumbnail() error { +func (v *ucbVideo) saveThumbnail() error { resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id) if err != nil { return err @@ -170,7 +170,7 @@ func (v ucbVideo) saveThumbnail() error { return err } -func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { options := jsonrpc.PublishOptions{ Title: &v.title, Author: strPtr("UC Berkeley"), @@ -186,11 +186,11 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } -func (v ucbVideo) Size() *int64 { +func (v *ucbVideo) Size() *int64 { return nil } -func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() if err != nil { diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index ffecc3f..9b3e2fd 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -30,9 +30,9 @@ type YoutubeVideo struct { dir string } -func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo { +func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo { publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors - return YoutubeVideo{ + return &YoutubeVideo{ id: snippet.ResourceId.VideoId, title: snippet.Title, description: snippet.Description, @@ -43,23 +43,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You } } -func (v YoutubeVideo) ID() string { +func (v *YoutubeVideo) ID() string { return v.id } -func (v YoutubeVideo) PlaylistPosition() int { +func (v *YoutubeVideo) PlaylistPosition() int { return int(v.playlistPosition) } -func (v YoutubeVideo) IDAndNum() string { +func (v *YoutubeVideo) IDAndNum() string { return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)" } -func (v YoutubeVideo) PublishedAt() time.Time { +func (v *YoutubeVideo) PublishedAt() time.Time { return v.publishedAt } -func (v YoutubeVideo) getFilename() string { +func (v *YoutubeVideo) getFilename() string { maxLen := 30 reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) @@ -86,7 +86,7 @@ func (v YoutubeVideo) getFilename() string { return v.videoDir() + "/" + name + ".mp4" } -func (v YoutubeVideo) getAbbrevDescription() string { +func (v *YoutubeVideo) getAbbrevDescription() string { maxLines := 10 description := strings.TrimSpace(v.description) if strings.Count(description, "\n") < maxLines { @@ -95,7 +95,7 @@ func (v YoutubeVideo) getAbbrevDescription() string { return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." } -func (v YoutubeVideo) download() error { +func (v *YoutubeVideo) download() error { videoPath := v.getFilename() err := os.Mkdir(v.videoDir(), 0750) @@ -128,11 +128,11 @@ func (v YoutubeVideo) download() error { return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile) } -func (v YoutubeVideo) videoDir() string { +func (v *YoutubeVideo) videoDir() string { return v.dir + "/" + v.id } -func (v YoutubeVideo) delete() error { +func (v *YoutubeVideo) delete() error { videoPath := v.getFilename() err := os.Remove(videoPath) if err != nil { @@ -143,7 +143,7 @@ func (v YoutubeVideo) delete() error { return nil } -func (v YoutubeVideo) triggerThumbnailSave() error { +func (v *YoutubeVideo) triggerThumbnailSave() error { client := &http.Client{Timeout: 30 * time.Second} params, err := json.Marshal(map[string]string{"videoid": v.id}) @@ -186,7 +186,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error { func strPtr(s string) *string { return &s } -func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { if channelID == "" { return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? } @@ -204,11 +204,11 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } -func (v YoutubeVideo) Size() *int64 { +func (v *YoutubeVideo) Size() *int64 { return v.size } -func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -220,9 +220,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f if err != nil { return nil, err } - *v.size = fi.Size() + videoSize := fi.Size() + v.size = &videoSize - if fi.Size() > int64(maxVideoSize)*1024*1024 { + if videoSize > int64(maxVideoSize)*1024*1024 { //delete the video and ignore the error _ = v.delete() return nil, errors.Err("the video is too big to sync, skipping for now") diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index dfddac3..9c0a31a 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -38,6 +38,7 @@ import ( const ( channelClaimAmount = 0.01 publishAmount = 0.01 + maxReasonLength = 500 ) type video interface { @@ -237,7 +238,7 @@ func (s *Sync) FullCycle() (e error) { log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") s.grp.Stop() }() - syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) + syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") if err != nil { return err } @@ -299,14 +300,15 @@ func (s *Sync) updateChannelStatus(e *error) { if util.SubstringInSlice((*e).Error(), noFailConditions) { return } - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) + failureReason := (*e).Error() + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) *e = errors.Prefix(err.Error(), *e) } } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) + _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") if err != nil { *e = err } @@ -344,7 +346,7 @@ func (s *Sync) stopAndUploadWallet(e *error) { err := s.uploadWallet() if err != nil { if *e == nil { - e = &err + e = &err //not 100% sure return } else { *e = errors.Prefix("failure uploading wallet: ", *e) -- 2.45.2 From 6b65cb115ab3d8f594c8fea899851311b79a1e59 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 22 Aug 2018 18:28:31 -0400 Subject: [PATCH 3/8] detect name collisions based on api data improve mutex usage remove unnecessary line breaks --- ytsync/manager.go | 13 ++++++++----- ytsync/setup.go | 4 ++-- ytsync/sources/shared.go | 24 ++++++++++++------------ ytsync/sources/ucbVideo.go | 22 ++++++++++++++-------- ytsync/sources/youtubeVideo.go | 10 ++++++++-- ytsync/ytsync.go | 27 +++++++++++++++------------ 6 files changed, 59 insertions(+), 41 deletions(-) diff --git a/ytsync/manager.go b/ytsync/manager.go index 3eed3b5..f923f4f 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -105,9 +105,10 @@ type syncedVideo struct { VideoID string `json:"video_id"` Published bool `json:"published"` FailureReason string `json:"failure_reason"` + ClaimName string `json:"claim_name"` } -func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, error) { +func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) { endpoint := s.ApiURL + "/yt/channel_status" if len(failureReason) > maxReasonLength { failureReason = failureReason[:maxReasonLength] @@ -124,19 +125,21 @@ func (s *SyncManager) setChannelStatus(channelID string, status string, failureR var response apiChannelStatusResponse err := json.Unmarshal(body, &response) if err != nil { - return nil, err + return nil, nil, err } if !response.Error.IsNull() { - return nil, errors.Err(response.Error.String) + return nil, nil, errors.Err(response.Error.String) } if response.Data != nil { svs := make(map[string]syncedVideo) + claimNames := make(map[string]bool) for _, v := range response.Data { svs[v.VideoID] = v + claimNames[v.ClaimName] = v.Published } - return svs, nil + return svs, claimNames, nil } - return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) + return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) } const ( diff --git a/ytsync/setup.go b/ytsync/setup.go index afaa0af..6d72a03 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error { return nil } - s.syncedVideosMux.Lock() + s.syncedVideosMux.RLock() numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... - s.syncedVideosMux.Unlock() + s.syncedVideosMux.RUnlock() log.Debugf("We already published %d videos", numPublished) if numOnSource-numPublished > s.Manager.VideosLimit { diff --git a/ytsync/sources/shared.go b/ytsync/sources/shared.go index 5e1515a..ba5be9e 100644 --- a/ytsync/sources/shared.go +++ b/ytsync/sources/shared.go @@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string { return name + suffix } -var publishedNamesMutex sync.RWMutex -var publishedNames = map[string]bool{} - -func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) { +func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { attempt := 0 for { attempt++ name := getClaimNameFromTitle(title, attempt) - publishedNamesMutex.RLock() - _, exists := publishedNames[name] - publishedNamesMutex.RUnlock() + syncedVideosMux.Lock() + _, exists := claimNames[name] if exists { - log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + log.Printf("name exists, retrying (%d attempts so far)", attempt) + syncedVideosMux.Unlock() continue } + claimNames[name] = false + syncedVideosMux.Unlock() + //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash if len(name) < 2 { hasher := md5.New() @@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string response, err := daemon.Publish(name, filename, amount, options) if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { - publishedNamesMutex.Lock() - publishedNames[name] = true - publishedNamesMutex.Unlock() + syncedVideosMux.Lock() + claimNames[name] = true + syncedVideosMux.Unlock() if err == nil { return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil } else { - log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + log.Printf("name exists, retrying (%d attempts so far)", attempt) continue } } else { diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go index 0073007..3819033 100644 --- a/ytsync/sources/ucbVideo.go +++ b/ytsync/sources/ucbVideo.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "sync" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -20,12 +22,14 @@ import ( ) type ucbVideo struct { - id string - title string - channel string - description string - publishedAt time.Time - dir string + id string + title string + channel string + description string + publishedAt time.Time + dir string + claimNames map[string]bool + syncedVideosMux *sync.RWMutex } func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo { @@ -183,14 +187,16 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f ChangeAddress: &claimAddress, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) } func (v *ucbVideo) Size() *int64 { return nil } -func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { + v.claimNames = claimNames + v.syncedVideosMux = syncedVideosMux //download and thumbnail can be done in parallel err := v.download() if err != nil { diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index 9b3e2fd..79ad52e 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "sync" + "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" @@ -28,6 +30,8 @@ type YoutubeVideo struct { size *int64 publishedAt time.Time dir string + claimNames map[string]bool + syncedVideosMux *sync.RWMutex } func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo { @@ -201,14 +205,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou ChangeAddress: &claimAddress, ChannelID: &channelID, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) } func (v *YoutubeVideo) Size() *int64 { return v.size } -func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { + v.claimNames = claimNames + v.syncedVideosMux = syncedVideosMux //download and thumbnail can be done in parallel err := v.download() if err != nil { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 9c0a31a..2c17059 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -47,7 +47,7 @@ type video interface { IDAndNum() string PlaylistPosition() int PublishedAt() time.Time - Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error) + Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error) } // sorting videos @@ -78,8 +78,9 @@ type Sync struct { claimAddress string videoDirectory string db *redisdb.DB + syncedVideosMux *sync.RWMutex syncedVideos map[string]syncedVideo - syncedVideosMux *sync.Mutex + claimNames map[string]bool grp *stop.Group lbryChannelID string @@ -87,7 +88,7 @@ type Sync struct { queue chan video } -func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { +func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { s.syncedVideosMux.Lock() defer s.syncedVideosMux.Unlock() s.syncedVideos[videoID] = syncedVideo{ @@ -95,6 +96,7 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s Published: published, FailureReason: failureReason, } + s.claimNames[claimName] = true } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -225,7 +227,7 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - s.syncedVideosMux = &sync.Mutex{} + s.syncedVideosMux = &sync.RWMutex{} s.walletMux = &sync.Mutex{} s.db = redisdb.New() s.grp = stop.New() @@ -238,12 +240,13 @@ func (s *Sync) FullCycle() (e error) { log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") s.grp.Stop() }() - syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") + syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") if err != nil { return err } s.syncedVideosMux.Lock() s.syncedVideos = syncedVideos + s.claimNames = claimNames s.syncedVideosMux.Unlock() defer s.updateChannelStatus(&e) @@ -301,14 +304,14 @@ func (s *Sync) updateChannelStatus(e *error) { return } failureReason := (*e).Error() - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) + _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) *e = errors.Prefix(err.Error(), *e) } } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") + _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") if err != nil { *e = err } @@ -472,7 +475,7 @@ func (s *Sync) startWorker(workerNum int) { } SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } - s.AppendSyncedVideo(v.ID(), false, err.Error()) + s.AppendSyncedVideo(v.ID(), false, err.Error(), "") err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) @@ -630,9 +633,9 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - s.syncedVideosMux.Lock() + s.syncedVideosMux.RLock() sv, ok := s.syncedVideos[v.ID()] - s.syncedVideosMux.Unlock() + s.syncedVideosMux.RUnlock() alreadyPublished := ok && sv.Published neverRetryFailures := []string{ @@ -670,7 +673,7 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { return err } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux) if err != nil { return err } @@ -679,7 +682,7 @@ func (s *Sync) processVideo(v video) (err error) { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } - s.AppendSyncedVideo(v.ID(), true, "") + s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName) return nil } -- 2.45.2 From c51ed5612dbb6b1caccb3b091c1d7e21f7c5c19b Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 22 Aug 2018 18:54:50 -0400 Subject: [PATCH 4/8] fix failure case --- ytsync/ytsync.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 2c17059..d44185b 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -96,7 +96,9 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s Published: published, FailureReason: failureReason, } - s.claimNames[claimName] = true + if claimName != "" { + s.claimNames[claimName] = true + } } // SendErrorToSlack Sends an error message to the default channel and to the process log. -- 2.45.2 From f918f9085372a78f647fb6fd4af09046c012973e Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 31 Aug 2018 11:42:15 -0400 Subject: [PATCH 5/8] update response signature add dupes check add de-sync checks --- jsonrpc/daemon.go | 12 ++++++++ jsonrpc/daemon_types.go | 49 +++++++++++++++++------------ ytsync/ytsync.go | 68 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 106 insertions(+), 23 deletions(-) diff --git a/jsonrpc/daemon.go b/jsonrpc/daemon.go index 27bdfba..07d7374 100644 --- a/jsonrpc/daemon.go +++ b/jsonrpc/daemon.go @@ -456,3 +456,15 @@ func (d *Client) NumClaimsInChannel(url string) (uint64, error) { } return channel.ClaimsInChannel, nil } + +func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) { + response := new(ClaimListMineResponse) + err := d.call(response, "claim_list_mine", map[string]interface{}{}) + if err != nil { + return nil, err + } else if response == nil { + return nil, errors.Err("no response") + } + + return response, nil +} diff --git a/jsonrpc/daemon_types.go b/jsonrpc/daemon_types.go index 6b03286..d1177d3 100644 --- a/jsonrpc/daemon_types.go +++ b/jsonrpc/daemon_types.go @@ -31,25 +31,32 @@ type Support struct { } type Claim struct { - Address string `json:"address"` - Amount decimal.Decimal `json:"amount"` - ClaimID string `json:"claim_id"` - ClaimSequence int `json:"claim_sequence"` - DecodedClaim bool `json:"decoded_claim"` - Depth int `json:"depth"` - EffectiveAmount decimal.Decimal `json:"effective_amount"` - Height int `json:"height"` - Hex string `json:"hex"` - Name string `json:"name"` - Nout int `json:"nout"` - Supports []Support `json:"supports"` - Txid string `json:"txid"` - ValidAtHeight int `json:"valid_at_height"` - Value lbryschema.Claim `json:"value"` - Error *string `json:"error,omitempty"` - ChannelName *string `json:"channel_name,omitempty"` - HasSignature *bool `json:"has_signature,omitempty"` - SignatureIsValid *bool `json:"signature_is_valid,omitempty"` + Address string `json:"address"` + Amount decimal.Decimal `json:"amount"` + BlocksToExpiration int `json:"blocks_to_expiration"` + Category string `json:"category"` + ClaimID string `json:"claim_id"` + ClaimSequence int `json:"claim_sequence"` + Confirmations int `json:"confirmations"` + DecodedClaim bool `json:"decoded_claim"` + Depth int `json:"depth"` + EffectiveAmount decimal.Decimal `json:"effective_amount"` + ExpirationHeight int `json:"expiration_height"` + Expired bool `json:"expired"` + Height int `json:"height"` + Hex string `json:"hex"` + IsSpent bool `json:"is_spent"` + Name string `json:"name"` + Nout int `json:"nout"` + PermanentUrl string `json:"permanent_url"` + Supports []Support `json:"supports"` + Txid string `json:"txid"` + ValidAtHeight int `json:"valid_at_height"` + Value lbryschema.Claim `json:"value"` + Error *string `json:"error,omitempty"` + ChannelName *string `json:"channel_name,omitempty"` + HasSignature *bool `json:"has_signature,omitempty"` + SignatureIsValid *bool `json:"signature_is_valid,omitempty"` } type File struct { @@ -234,7 +241,9 @@ type ClaimListResponse struct { LastTakeoverHeight int `json:"last_takeover_height"` SupportsWithoutClaims []Support `json:"supports_without_claims"` } - +type ClaimListMineResponse struct { + Claims []Claim `json:"claims"` +} type ClaimShowResponse Claim type PeerListResponsePeer struct { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index d44185b..75ee593 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -365,9 +365,71 @@ func logShutdownError(shutdownErr error) { SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") } +func hasDupes(claims []jsonrpc.Claim) (bool, error) { + videoIDs := make(map[string]interface{}) + for _, c := range claims { + if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { + continue + } + if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { + return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID) + } + tn := *c.Value.Stream.Metadata.Thumbnail + videoID := tn[:strings.LastIndex(tn, "/")+1] + _, ok := videoIDs[videoID] + if !ok { + videoIDs[videoID] = nil + continue + } + return true, nil + } + return false, nil +} + +//publishesCount counts the amount of videos published so far +func publishesCount(claims []jsonrpc.Claim) (int, error) { + count := 0 + for _, c := range claims { + if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { + continue + } + if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { + return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID) + } + count++ + } + return count, nil +} + func (s *Sync) doSync() error { var err error - + claims, err := s.daemon.ClaimListMine() + if err != nil { + return errors.Prefix("cannot list claims: ", err) + } + hasDupes, err := hasDupes(claims.Claims) + if err != nil { + return errors.Prefix("error checking for duplicates: ", err) + } + if hasDupes { + return errors.Err("channel has duplicates! Manual fix required") + } + pubsOnWallet, err := publishesCount(claims.Claims) + if err != nil { + return errors.Prefix("error counting claims: ", err) + } + pubsOnDB := 0 + for _, sv := range s.syncedVideos { + if sv.Published { + pubsOnDB++ + } + } + if pubsOnWallet > pubsOnDB { + return errors.Err("not all published videos are in the database") + } + if pubsOnWallet < pubsOnDB { + SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.lbryChannelID) + } err = s.walletSetup() if err != nil { return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err) @@ -379,10 +441,10 @@ func (s *Sync) doSync() error { for i := 0; i < s.ConcurrentVideos; i++ { s.grp.Add(1) - go func() { + go func(i int) { defer s.grp.Done() s.startWorker(i) - }() + }(i) } if s.LbryChannelName == "@UCBerkeley" { -- 2.45.2 From a0565e809e8512cf4e16e06fec3d17466738821d Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 29 Aug 2018 18:34:30 -0400 Subject: [PATCH 6/8] switch to MP4 over webm --- ytsync/sources/youtubeVideo.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index 79ad52e..b693682 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -128,8 +128,7 @@ func (v *YoutubeVideo) download() error { } defer downloadedFile.Close() - - return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile) + return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile) } func (v *YoutubeVideo) videoDir() string { -- 2.45.2 From 2cd2bb6d69756be1343b063abeee823d4a9ca5d3 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 31 Aug 2018 12:22:49 -0400 Subject: [PATCH 7/8] adjust bugs --- jsonrpc/daemon_types.go | 4 +--- ytsync/ytsync.go | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/jsonrpc/daemon_types.go b/jsonrpc/daemon_types.go index d1177d3..84212ee 100644 --- a/jsonrpc/daemon_types.go +++ b/jsonrpc/daemon_types.go @@ -241,9 +241,7 @@ type ClaimListResponse struct { LastTakeoverHeight int `json:"last_takeover_height"` SupportsWithoutClaims []Support `json:"supports_without_claims"` } -type ClaimListMineResponse struct { - Claims []Claim `json:"claims"` -} +type ClaimListMineResponse []Claim type ClaimShowResponse Claim type PeerListResponsePeer struct { diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 75ee593..9ece285 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -407,14 +407,14 @@ func (s *Sync) doSync() error { if err != nil { return errors.Prefix("cannot list claims: ", err) } - hasDupes, err := hasDupes(claims.Claims) + hasDupes, err := hasDupes(*claims) if err != nil { return errors.Prefix("error checking for duplicates: ", err) } if hasDupes { return errors.Err("channel has duplicates! Manual fix required") } - pubsOnWallet, err := publishesCount(claims.Claims) + pubsOnWallet, err := publishesCount(*claims) if err != nil { return errors.Prefix("error counting claims: ", err) } @@ -428,7 +428,7 @@ func (s *Sync) doSync() error { return errors.Err("not all published videos are in the database") } if pubsOnWallet < pubsOnDB { - SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.lbryChannelID) + SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) } err = s.walletSetup() if err != nil { -- 2.45.2 From a673f05d20db3ce74133f409615d4d11255734ef Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 5 Sep 2018 13:54:33 -0400 Subject: [PATCH 8/8] address memory leak --- ytsync/sources/youtubeVideo.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index b693682..19a5620 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -123,11 +123,11 @@ func (v *YoutubeVideo) download() error { var downloadedFile *os.File downloadedFile, err = os.Create(videoPath) + defer downloadedFile.Close() if err != nil { return err } - defer downloadedFile.Close() return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile) } @@ -171,17 +171,17 @@ func (v *YoutubeVideo) triggerThumbnailSave() error { } var decoded struct { - error int `json:"error"` - url string `json:"url,omitempty"` - message string `json:"message,omitempty"` + Error int `json:"error"` + Url string `json:"url,omitempty"` + Message string `json:"message,omitempty"` } err = json.Unmarshal(contents, &decoded) if err != nil { return err } - if decoded.error != 0 { - return errors.Err("error creating thumbnail: " + decoded.message) + if decoded.Error != 0 { + return errors.Err("error creating thumbnail: " + decoded.Message) } return nil -- 2.45.2