diff --git a/manager.go b/manager.go index 3eed3b5..f923f4f 100644 --- a/manager.go +++ b/manager.go @@ -105,9 +105,10 @@ type syncedVideo struct { VideoID string `json:"video_id"` Published bool `json:"published"` FailureReason string `json:"failure_reason"` + ClaimName string `json:"claim_name"` } -func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, error) { +func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) { endpoint := s.ApiURL + "/yt/channel_status" if len(failureReason) > maxReasonLength { failureReason = failureReason[:maxReasonLength] @@ -124,19 +125,21 @@ func (s *SyncManager) setChannelStatus(channelID string, status string, failureR var response apiChannelStatusResponse err := json.Unmarshal(body, &response) if err != nil { - return nil, err + return nil, nil, err } if !response.Error.IsNull() { - return nil, errors.Err(response.Error.String) + return nil, nil, errors.Err(response.Error.String) } if response.Data != nil { svs := make(map[string]syncedVideo) + claimNames := make(map[string]bool) for _, v := range response.Data { svs[v.VideoID] = v + claimNames[v.ClaimName] = v.Published } - return svs, nil + return svs, claimNames, nil } - return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) + return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) } const ( diff --git a/setup.go b/setup.go index afaa0af..6d72a03 100644 --- a/setup.go +++ b/setup.go @@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error { return nil } - s.syncedVideosMux.Lock() + s.syncedVideosMux.RLock() numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... - s.syncedVideosMux.Unlock() + s.syncedVideosMux.RUnlock() log.Debugf("We already published %d videos", numPublished) if numOnSource-numPublished > s.Manager.VideosLimit { diff --git a/sources/shared.go b/sources/shared.go index 5e1515a..ba5be9e 100644 --- a/sources/shared.go +++ b/sources/shared.go @@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string { return name + suffix } -var publishedNamesMutex sync.RWMutex -var publishedNames = map[string]bool{} - -func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) { +func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { attempt := 0 for { attempt++ name := getClaimNameFromTitle(title, attempt) - publishedNamesMutex.RLock() - _, exists := publishedNames[name] - publishedNamesMutex.RUnlock() + syncedVideosMux.Lock() + _, exists := claimNames[name] if exists { - log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + log.Printf("name exists, retrying (%d attempts so far)", attempt) + syncedVideosMux.Unlock() continue } + claimNames[name] = false + syncedVideosMux.Unlock() + //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash if len(name) < 2 { hasher := md5.New() @@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string response, err := daemon.Publish(name, filename, amount, options) if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { - publishedNamesMutex.Lock() - publishedNames[name] = true - publishedNamesMutex.Unlock() + syncedVideosMux.Lock() + claimNames[name] = true + syncedVideosMux.Unlock() if err == nil { return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil } else { - log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + log.Printf("name exists, retrying (%d attempts so far)", attempt) continue } } else { diff --git a/sources/ucbVideo.go b/sources/ucbVideo.go index 0073007..3819033 100644 --- a/sources/ucbVideo.go +++ b/sources/ucbVideo.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "sync" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -20,12 +22,14 @@ import ( ) type ucbVideo struct { - id string - title string - channel string - description string - publishedAt time.Time - dir string + id string + title string + channel string + description string + publishedAt time.Time + dir string + claimNames map[string]bool + syncedVideosMux *sync.RWMutex } func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo { @@ -183,14 +187,16 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f ChangeAddress: &claimAddress, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) } func (v *ucbVideo) Size() *int64 { return nil } -func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { + v.claimNames = claimNames + v.syncedVideosMux = syncedVideosMux //download and thumbnail can be done in parallel err := v.download() if err != nil { diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 9b3e2fd..79ad52e 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "sync" + "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" @@ -28,6 +30,8 @@ type YoutubeVideo struct { size *int64 publishedAt time.Time dir string + claimNames map[string]bool + syncedVideosMux *sync.RWMutex } func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo { @@ -201,14 +205,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou ChangeAddress: &claimAddress, ChannelID: &channelID, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) } func (v *YoutubeVideo) Size() *int64 { return v.size } -func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { +func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { + v.claimNames = claimNames + v.syncedVideosMux = syncedVideosMux //download and thumbnail can be done in parallel err := v.download() if err != nil { diff --git a/ytsync.go b/ytsync.go index 9c0a31a..2c17059 100644 --- a/ytsync.go +++ b/ytsync.go @@ -47,7 +47,7 @@ type video interface { IDAndNum() string PlaylistPosition() int PublishedAt() time.Time - Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error) + Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error) } // sorting videos @@ -78,8 +78,9 @@ type Sync struct { claimAddress string videoDirectory string db *redisdb.DB + syncedVideosMux *sync.RWMutex syncedVideos map[string]syncedVideo - syncedVideosMux *sync.Mutex + claimNames map[string]bool grp *stop.Group lbryChannelID string @@ -87,7 +88,7 @@ type Sync struct { queue chan video } -func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { +func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { s.syncedVideosMux.Lock() defer s.syncedVideosMux.Unlock() s.syncedVideos[videoID] = syncedVideo{ @@ -95,6 +96,7 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s Published: published, FailureReason: failureReason, } + s.claimNames[claimName] = true } // SendErrorToSlack Sends an error message to the default channel and to the process log. @@ -225,7 +227,7 @@ func (s *Sync) FullCycle() (e error) { if s.YoutubeChannelID == "" { return errors.Err("channel ID not provided") } - s.syncedVideosMux = &sync.Mutex{} + s.syncedVideosMux = &sync.RWMutex{} s.walletMux = &sync.Mutex{} s.db = redisdb.New() s.grp = stop.New() @@ -238,12 +240,13 @@ func (s *Sync) FullCycle() (e error) { log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") s.grp.Stop() }() - syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") + syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") if err != nil { return err } s.syncedVideosMux.Lock() s.syncedVideos = syncedVideos + s.claimNames = claimNames s.syncedVideosMux.Unlock() defer s.updateChannelStatus(&e) @@ -301,14 +304,14 @@ func (s *Sync) updateChannelStatus(e *error) { return } failureReason := (*e).Error() - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) + _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) err = errors.Prefix(msg, err) *e = errors.Prefix(err.Error(), *e) } } else if !s.IsInterrupted() { - _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") + _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") if err != nil { *e = err } @@ -472,7 +475,7 @@ func (s *Sync) startWorker(workerNum int) { } SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) } - s.AppendSyncedVideo(v.ID(), false, err.Error()) + s.AppendSyncedVideo(v.ID(), false, err.Error(), "") err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) @@ -630,9 +633,9 @@ func (s *Sync) processVideo(v video) (err error) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) - s.syncedVideosMux.Lock() + s.syncedVideosMux.RLock() sv, ok := s.syncedVideos[v.ID()] - s.syncedVideosMux.Unlock() + s.syncedVideosMux.RUnlock() alreadyPublished := ok && sv.Published neverRetryFailures := []string{ @@ -670,7 +673,7 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { return err } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux) if err != nil { return err } @@ -679,7 +682,7 @@ func (s *Sync) processVideo(v video) (err error) { SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) } - s.AppendSyncedVideo(v.ID(), true, "") + s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName) return nil }