diff --git a/jsonrpc/daemon.go b/jsonrpc/daemon.go index 27bdfba..07d7374 100644 --- a/jsonrpc/daemon.go +++ b/jsonrpc/daemon.go @@ -456,3 +456,15 @@ func (d *Client) NumClaimsInChannel(url string) (uint64, error) { } return channel.ClaimsInChannel, nil } + +func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) { + response := new(ClaimListMineResponse) + err := d.call(response, "claim_list_mine", map[string]interface{}{}) + if err != nil { + return nil, err + } else if response == nil { + return nil, errors.Err("no response") + } + + return response, nil +} diff --git a/jsonrpc/daemon_types.go b/jsonrpc/daemon_types.go index 6b03286..84212ee 100644 --- a/jsonrpc/daemon_types.go +++ b/jsonrpc/daemon_types.go @@ -31,25 +31,32 @@ type Support struct { } type Claim struct { - Address string `json:"address"` - Amount decimal.Decimal `json:"amount"` - ClaimID string `json:"claim_id"` - ClaimSequence int `json:"claim_sequence"` - DecodedClaim bool `json:"decoded_claim"` - Depth int `json:"depth"` - EffectiveAmount decimal.Decimal `json:"effective_amount"` - Height int `json:"height"` - Hex string `json:"hex"` - Name string `json:"name"` - Nout int `json:"nout"` - Supports []Support `json:"supports"` - Txid string `json:"txid"` - ValidAtHeight int `json:"valid_at_height"` - Value lbryschema.Claim `json:"value"` - Error *string `json:"error,omitempty"` - ChannelName *string `json:"channel_name,omitempty"` - HasSignature *bool `json:"has_signature,omitempty"` - SignatureIsValid *bool `json:"signature_is_valid,omitempty"` + Address string `json:"address"` + Amount decimal.Decimal `json:"amount"` + BlocksToExpiration int `json:"blocks_to_expiration"` + Category string `json:"category"` + ClaimID string `json:"claim_id"` + ClaimSequence int `json:"claim_sequence"` + Confirmations int `json:"confirmations"` + DecodedClaim bool `json:"decoded_claim"` + Depth int `json:"depth"` + EffectiveAmount decimal.Decimal `json:"effective_amount"` + ExpirationHeight int `json:"expiration_height"` + Expired bool `json:"expired"` + Height int `json:"height"` + Hex string `json:"hex"` + IsSpent bool `json:"is_spent"` + Name string `json:"name"` + Nout int `json:"nout"` + PermanentUrl string `json:"permanent_url"` + Supports []Support `json:"supports"` + Txid string `json:"txid"` + ValidAtHeight int `json:"valid_at_height"` + Value lbryschema.Claim `json:"value"` + Error *string `json:"error,omitempty"` + ChannelName *string `json:"channel_name,omitempty"` + HasSignature *bool `json:"has_signature,omitempty"` + SignatureIsValid *bool `json:"signature_is_valid,omitempty"` } type File struct { @@ -234,7 +241,7 @@ type ClaimListResponse struct { LastTakeoverHeight int `json:"last_takeover_height"` SupportsWithoutClaims []Support `json:"supports_without_claims"` } - +type ClaimListMineResponse []Claim type ClaimShowResponse Claim type PeerListResponsePeer struct { diff --git a/ytsync/manager.go b/ytsync/manager.go index 8aa8b1a..f923f4f 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -70,7 +70,7 @@ type apiYoutubeChannel struct { SyncServer null.String `json:"sync_server"` } -func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { +func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) { endpoint := s.ApiURL + "/yt/jobs" res, _ := http.PostForm(endpoint, url.Values{ "auth_token": {s.ApiToken}, @@ -105,35 +105,41 @@ type syncedVideo struct { VideoID string `json:"video_id"` Published bool `json:"published"` FailureReason string `json:"failure_reason"` + ClaimName string `json:"claim_name"` } -func (s SyncManager) setChannelStatus(channelID string, status string) (map[string]syncedVideo, error) { +func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) { endpoint := s.ApiURL + "/yt/channel_status" - + if len(failureReason) > maxReasonLength { + failureReason = failureReason[:maxReasonLength] + } res, _ := http.PostForm(endpoint, url.Values{ - "channel_id": {channelID}, - "sync_server": {s.HostName}, - "auth_token": {s.ApiToken}, - "sync_status": {status}, + "channel_id": {channelID}, + "sync_server": {s.HostName}, + "auth_token": {s.ApiToken}, + "sync_status": {status}, + "failure_reason": {failureReason}, }) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) var response apiChannelStatusResponse err := json.Unmarshal(body, &response) if err != nil { - return nil, err + return nil, nil, err } if !response.Error.IsNull() { - return nil, errors.Err(response.Error.String) + return nil, nil, errors.Err(response.Error.String) } if response.Data != nil { svs := make(map[string]syncedVideo) + claimNames := make(map[string]bool) for _, v := range response.Data { svs[v.VideoID] = v + claimNames[v.ClaimName] = v.Published } - return svs, nil + return svs, claimNames, nil } - return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) + return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) } const ( @@ -141,9 +147,11 @@ const ( VideoStatusFailed = "failed" ) -func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string) error { +func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { endpoint := s.ApiURL + "/yt/video_status" - + if len(failureReason) > maxReasonLength { + failureReason = failureReason[:maxReasonLength] + } vals := url.Values{ "youtube_channel_id": {channelID}, "video_id": {videoID}, @@ -157,12 +165,11 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10)) vals.Add("claim_id", claimID) vals.Add("claim_name", claimName) + if size != nil { + vals.Add("size", strconv.FormatInt(*size, 10)) + } } if failureReason != "" { - maxReasonLength := 500 - if len(failureReason) > maxReasonLength { - failureReason = failureReason[:500] - } vals.Add("failure_reason", failureReason) } res, _ := http.PostForm(endpoint, vals) @@ -186,7 +193,7 @@ func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status st return errors.Err("invalid API response. Status code: %d", res.StatusCode) } -func (s SyncManager) Start() error { +func (s *SyncManager) Start() error { syncCount := 0 for { err := s.checkUsedSpace() @@ -220,7 +227,7 @@ func (s SyncManager) Start() error { ConcurrentVideos: s.ConcurrentVideos, TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, - Manager: &s, + Manager: s, LbrycrdString: s.LbrycrdString, AwsS3ID: s.AwsS3ID, AwsS3Secret: s.AwsS3Secret, @@ -255,7 +262,7 @@ func (s SyncManager) Start() error { ConcurrentVideos: s.ConcurrentVideos, TakeOverExistingChannel: s.TakeOverExistingChannel, Refill: s.Refill, - Manager: &s, + Manager: s, LbrycrdString: s.LbrycrdString, AwsS3ID: s.AwsS3ID, AwsS3Secret: s.AwsS3Secret, @@ -305,11 +312,11 @@ func (s SyncManager) Start() error { return nil } -func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool { +func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool { return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName) } -func (s SyncManager) checkUsedSpace() error { +func (s *SyncManager) checkUsedSpace() error { usedPctile, err := GetUsedSpace(s.BlobsDir) if err != nil { return err diff --git a/ytsync/setup.go b/ytsync/setup.go index afaa0af..6d72a03 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error { return nil } - s.syncedVideosMux.Lock() + s.syncedVideosMux.RLock() numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... - s.syncedVideosMux.Unlock() + s.syncedVideosMux.RUnlock() log.Debugf("We already published %d videos", numPublished) if numOnSource-numPublished > s.Manager.VideosLimit { diff --git a/ytsync/sources/shared.go b/ytsync/sources/shared.go index 5e1515a..ba5be9e 100644 --- a/ytsync/sources/shared.go +++ b/ytsync/sources/shared.go @@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string { return name + suffix } -var publishedNamesMutex sync.RWMutex -var publishedNames = map[string]bool{} - -func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) { +func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { attempt := 0 for { attempt++ name := getClaimNameFromTitle(title, attempt) - publishedNamesMutex.RLock() - _, exists := publishedNames[name] - publishedNamesMutex.RUnlock() + syncedVideosMux.Lock() + _, exists := claimNames[name] if exists { - log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + log.Printf("name exists, retrying (%d attempts so far)", attempt) + syncedVideosMux.Unlock() continue } + claimNames[name] = false + syncedVideosMux.Unlock() + //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash if len(name) < 2 { hasher := md5.New() @@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string response, err := daemon.Publish(name, filename, amount, options) if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { - publishedNamesMutex.Lock() - publishedNames[name] = true - publishedNamesMutex.Unlock() + syncedVideosMux.Lock() + claimNames[name] = true + syncedVideosMux.Unlock() if err == nil { return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil } else { - log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + log.Printf("name exists, retrying (%d attempts so far)", attempt) continue } } else { diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go index f7ead6a..3819033 100644 --- a/ytsync/sources/ucbVideo.go +++ b/ytsync/sources/ucbVideo.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "sync" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -20,17 +22,19 @@ import ( ) type ucbVideo struct { - id string - title string - channel string - description string - publishedAt time.Time - dir string + id string + title string + channel string + description string + publishedAt time.Time + dir string + claimNames map[string]bool + syncedVideosMux *sync.RWMutex } -func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo { +func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo { p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors - return ucbVideo{ + return &ucbVideo{ id: id, title: title, description: description, @@ -40,19 +44,19 @@ func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVi } } -func (v ucbVideo) ID() string { +func (v *ucbVideo) ID() string { return v.id } -func (v ucbVideo) PlaylistPosition() int { +func (v *ucbVideo) PlaylistPosition() int { return 0 } -func (v ucbVideo) IDAndNum() string { +func (v *ucbVideo) IDAndNum() string { return v.ID() + " (?)" } -func (v ucbVideo) PublishedAt() time.Time { +func (v *ucbVideo) PublishedAt() time.Time { return v.publishedAt //r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`) //matches := r.FindStringSubmatch(v.title) @@ -65,11 +69,11 @@ func (v ucbVideo) PublishedAt() time.Time { //return time.Now() } -func (v ucbVideo) getFilename() string { +func (v *ucbVideo) getFilename() string { return v.dir + "/" + v.id + ".mp4" } -func (v ucbVideo) getClaimName(attempt int) string { +func (v *ucbVideo) getClaimName(attempt int) string { reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) suffix := "" if attempt > 1 { @@ -98,7 +102,7 @@ func (v ucbVideo) getClaimName(attempt int) string { return name + suffix } -func (v ucbVideo) getAbbrevDescription() string { +func (v *ucbVideo) getAbbrevDescription() string { maxLines := 10 description := strings.TrimSpace(v.description) if strings.Count(description, "\n") < maxLines { @@ -107,7 +111,7 @@ func (v ucbVideo) getAbbrevDescription() string { return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." } -func (v ucbVideo) download() error { +func (v *ucbVideo) download() error { videoPath := v.getFilename() _, err := os.Stat(videoPath) @@ -146,7 +150,7 @@ func (v ucbVideo) download() error { return nil } -func (v ucbVideo) saveThumbnail() error { +func (v *ucbVideo) saveThumbnail() error { resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id) if err != nil { return err @@ -170,7 +174,7 @@ func (v ucbVideo) saveThumbnail() error { return err } -func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { options := jsonrpc.PublishOptions{ Title: &v.title, Author: strPtr("UC Berkeley"), @@ -183,10 +187,16 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl ChangeAddress: &claimAddress, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) } -func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *ucbVideo) Size() *int64 { + return nil +} + +func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { + v.claimNames = claimNames + v.syncedVideosMux = syncedVideosMux //download and thumbnail can be done in parallel err := v.download() if err != nil { diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index 9fb091e..19a5620 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "sync" + "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" @@ -25,13 +27,16 @@ type YoutubeVideo struct { title string description string playlistPosition int64 + size *int64 publishedAt time.Time dir string + claimNames map[string]bool + syncedVideosMux *sync.RWMutex } -func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) YoutubeVideo { +func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo { publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors - return YoutubeVideo{ + return &YoutubeVideo{ id: snippet.ResourceId.VideoId, title: snippet.Title, description: snippet.Description, @@ -42,23 +47,23 @@ func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) You } } -func (v YoutubeVideo) ID() string { +func (v *YoutubeVideo) ID() string { return v.id } -func (v YoutubeVideo) PlaylistPosition() int { +func (v *YoutubeVideo) PlaylistPosition() int { return int(v.playlistPosition) } -func (v YoutubeVideo) IDAndNum() string { +func (v *YoutubeVideo) IDAndNum() string { return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)" } -func (v YoutubeVideo) PublishedAt() time.Time { +func (v *YoutubeVideo) PublishedAt() time.Time { return v.publishedAt } -func (v YoutubeVideo) getFilename() string { +func (v *YoutubeVideo) getFilename() string { maxLen := 30 reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) @@ -85,7 +90,7 @@ func (v YoutubeVideo) getFilename() string { return v.videoDir() + "/" + name + ".mp4" } -func (v YoutubeVideo) getAbbrevDescription() string { +func (v *YoutubeVideo) getAbbrevDescription() string { maxLines := 10 description := strings.TrimSpace(v.description) if strings.Count(description, "\n") < maxLines { @@ -94,7 +99,7 @@ func (v YoutubeVideo) getAbbrevDescription() string { return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." } -func (v YoutubeVideo) download() error { +func (v *YoutubeVideo) download() error { videoPath := v.getFilename() err := os.Mkdir(v.videoDir(), 0750) @@ -118,20 +123,19 @@ func (v YoutubeVideo) download() error { var downloadedFile *os.File downloadedFile, err = os.Create(videoPath) + defer downloadedFile.Close() if err != nil { return err } - defer downloadedFile.Close() - - return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile) + return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile) } -func (v YoutubeVideo) videoDir() string { +func (v *YoutubeVideo) videoDir() string { return v.dir + "/" + v.id } -func (v YoutubeVideo) delete() error { +func (v *YoutubeVideo) delete() error { videoPath := v.getFilename() err := os.Remove(videoPath) if err != nil { @@ -142,7 +146,7 @@ func (v YoutubeVideo) delete() error { return nil } -func (v YoutubeVideo) triggerThumbnailSave() error { +func (v *YoutubeVideo) triggerThumbnailSave() error { client := &http.Client{Timeout: 30 * time.Second} params, err := json.Marshal(map[string]string{"videoid": v.id}) @@ -167,17 +171,17 @@ func (v YoutubeVideo) triggerThumbnailSave() error { } var decoded struct { - error int `json:"error"` - url string `json:"url,omitempty"` - message string `json:"message,omitempty"` + Error int `json:"error"` + Url string `json:"url,omitempty"` + Message string `json:"message,omitempty"` } err = json.Unmarshal(contents, &decoded) if err != nil { return err } - if decoded.error != 0 { - return errors.Err("error creating thumbnail: " + decoded.message) + if decoded.Error != 0 { + return errors.Err("error creating thumbnail: " + decoded.Message) } return nil @@ -185,7 +189,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error { func strPtr(s string) *string { return &s } -func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { if channelID == "" { return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? } @@ -200,10 +204,16 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun ChangeAddress: &claimAddress, ChannelID: &channelID, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) } -func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *YoutubeVideo) Size() *int64 { + return v.size +} + +func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { + v.claimNames = claimNames + v.syncedVideosMux = syncedVideosMux //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -215,7 +225,10 @@ func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount f if err != nil { return nil, err } - if fi.Size() > int64(maxVideoSize)*1024*1024 { + videoSize := fi.Size() + v.size = &videoSize + + if videoSize > int64(maxVideoSize)*1024*1024 { //delete the video and ignore the error _ = v.delete() return nil, errors.Err("the video is too big to sync, skipping for now") diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 2df409d..9ece285 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -38,14 +38,16 @@ import ( const ( channelClaimAmount = 0.01 publishAmount = 0.01 + maxReasonLength = 500 ) type video interface { + Size() *int64 ID() string IDAndNum() string PlaylistPosition() int PublishedAt() time.Time - Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error) + Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error) } // sorting videos @@ -76,8 +78,9 @@ type Sync struct { claimAddress string videoDirectory string db *redisdb.DB + syncedVideosMux *sync.RWMutex syncedVideos map[string]syncedVideo - syncedVideosMux *sync.Mutex + claimNames map[string]bool grp *stop.Group lbryChannelID string @@ -85,7 +88,7 @@ type Sync struct { queue chan video } -func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { +func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { s.syncedVideosMux.Lock() defer s.syncedVideosMux.Unlock() s.syncedVideos[videoID] = syncedVideo{ @@ -93,6 +96,9 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s Published: published, FailureReason: failureReason, } + if claimName != "" { + s.claimNames[claimName] = true + } } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -223,7 +229,7 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - s.syncedVideosMux = &sync.Mutex{} + s.syncedVideosMux = &sync.RWMutex{} s.walletMux = &sync.Mutex{} s.db = redisdb.New() s.grp = stop.New() @@ -236,12 +242,13 @@ func (s *Sync) FullCycle() (e error) { log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") s.grp.Stop() }() - syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing) + syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") if err != nil { return err } s.syncedVideosMux.Lock() s.syncedVideos = syncedVideos + s.claimNames = claimNames s.syncedVideosMux.Unlock() defer s.updateChannelStatus(&e) @@ -298,14 +305,15 @@ func (s *Sync) updateChannelStatus(e *error) { if util.SubstringInSlice((*e).Error(), noFailConditions) { return } - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed) + failureReason := (*e).Error() + _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) *e = errors.Prefix(err.Error(), *e) } } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced) + _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") if err != nil { *e = err } @@ -343,7 +351,7 @@ func (s *Sync) stopAndUploadWallet(e *error) { err := s.uploadWallet() if err != nil { if *e == nil { - e = &err + e = &err //not 100% sure return } else { *e = errors.Prefix("failure uploading wallet: ", *e) @@ -357,9 +365,71 @@ func logShutdownError(shutdownErr error) { SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") } +func hasDupes(claims []jsonrpc.Claim) (bool, error) { + videoIDs := make(map[string]interface{}) + for _, c := range claims { + if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { + continue + } + if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { + return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID) + } + tn := *c.Value.Stream.Metadata.Thumbnail + videoID := tn[:strings.LastIndex(tn, "/")+1] + _, ok := videoIDs[videoID] + if !ok { + videoIDs[videoID] = nil + continue + } + return true, nil + } + return false, nil +} + +//publishesCount counts the amount of videos published so far +func publishesCount(claims []jsonrpc.Claim) (int, error) { + count := 0 + for _, c := range claims { + if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { + continue + } + if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { + return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID) + } + count++ + } + return count, nil +} + func (s *Sync) doSync() error { var err error - + claims, err := s.daemon.ClaimListMine() + if err != nil { + return errors.Prefix("cannot list claims: ", err) + } + hasDupes, err := hasDupes(*claims) + if err != nil { + return errors.Prefix("error checking for duplicates: ", err) + } + if hasDupes { + return errors.Err("channel has duplicates! Manual fix required") + } + pubsOnWallet, err := publishesCount(*claims) + if err != nil { + return errors.Prefix("error counting claims: ", err) + } + pubsOnDB := 0 + for _, sv := range s.syncedVideos { + if sv.Published { + pubsOnDB++ + } + } + if pubsOnWallet > pubsOnDB { + return errors.Err("not all published videos are in the database") + } + if pubsOnWallet < pubsOnDB { + SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) + } err = s.walletSetup() if err != nil { return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err) @@ -371,10 +441,10 @@ func (s *Sync) doSync() error { for i := 0; i < s.ConcurrentVideos; i++ { s.grp.Add(1) - go func() { + go func(i int) { defer s.grp.Done() s.startWorker(i) - }() + }(i) } if s.LbryChannelName == "@UCBerkeley" { @@ -469,8 +539,8 @@ func (s *Sync) startWorker(workerNum int) { } SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } - s.AppendSyncedVideo(v.ID(), false, err.Error()) - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error()) + s.AppendSyncedVideo(v.ID(), false, err.Error(), "") + err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } @@ -627,9 +697,9 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - s.syncedVideosMux.Lock() + s.syncedVideosMux.RLock() sv, ok := s.syncedVideos[v.ID()] - s.syncedVideosMux.Unlock() + s.syncedVideosMux.RUnlock() alreadyPublished := ok && sv.Published neverRetryFailures := []string{ @@ -667,15 +737,16 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { return err } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux) if err != nil { return err } - err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "") + err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) if err != nil { - return err + SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } - s.AppendSyncedVideo(v.ID(), true, "") + + s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName) return nil }