detect name collisions based on api data

improve mutex usage
remove unnecessary line breaks
This commit is contained in:
Niko Storni 2018-08-22 18:28:31 -04:00
parent 83b0e40ec5
commit 06e9dd9a98
6 changed files with 59 additions and 41 deletions

View file

@ -105,9 +105,10 @@ type syncedVideo struct {
VideoID string `json:"video_id"` VideoID string `json:"video_id"`
Published bool `json:"published"` Published bool `json:"published"`
FailureReason string `json:"failure_reason"` FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
} }
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, error) { func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
endpoint := s.ApiURL + "/yt/channel_status" endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength { if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength] failureReason = failureReason[:maxReasonLength]
@ -124,19 +125,21 @@ func (s *SyncManager) setChannelStatus(channelID string, status string, failureR
var response apiChannelStatusResponse var response apiChannelStatusResponse
err := json.Unmarshal(body, &response) err := json.Unmarshal(body, &response)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if !response.Error.IsNull() { if !response.Error.IsNull() {
return nil, errors.Err(response.Error.String) return nil, nil, errors.Err(response.Error.String)
} }
if response.Data != nil { if response.Data != nil {
svs := make(map[string]syncedVideo) svs := make(map[string]syncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data { for _, v := range response.Data {
svs[v.VideoID] = v svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
} }
return svs, nil return svs, claimNames, nil
} }
return nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
} }
const ( const (

View file

@ -45,9 +45,9 @@ func (s *Sync) walletSetup() error {
return nil return nil
} }
s.syncedVideosMux.Lock() s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.Unlock() s.syncedVideosMux.RUnlock()
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already published %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit { if numOnSource-numPublished > s.Manager.VideosLimit {

View file

@ -49,22 +49,22 @@ func getClaimNameFromTitle(title string, attempt int) string {
return name + suffix return name + suffix
} }
var publishedNamesMutex sync.RWMutex func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
var publishedNames = map[string]bool{}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
attempt := 0 attempt := 0
for { for {
attempt++ attempt++
name := getClaimNameFromTitle(title, attempt) name := getClaimNameFromTitle(title, attempt)
publishedNamesMutex.RLock() syncedVideosMux.Lock()
_, exists := publishedNames[name] _, exists := claimNames[name]
publishedNamesMutex.RUnlock()
if exists { if exists {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt) log.Printf("name exists, retrying (%d attempts so far)", attempt)
syncedVideosMux.Unlock()
continue continue
} }
claimNames[name] = false
syncedVideosMux.Unlock()
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 { if len(name) < 2 {
hasher := md5.New() hasher := md5.New()
@ -74,13 +74,13 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
response, err := daemon.Publish(name, filename, amount, options) response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
publishedNamesMutex.Lock() syncedVideosMux.Lock()
publishedNames[name] = true claimNames[name] = true
publishedNamesMutex.Unlock() syncedVideosMux.Unlock()
if err == nil { if err == nil {
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else { } else {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt) log.Printf("name exists, retrying (%d attempts so far)", attempt)
continue continue
} }
} else { } else {

View file

@ -8,6 +8,8 @@ import (
"strings" "strings"
"time" "time"
"sync"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
@ -20,12 +22,14 @@ import (
) )
type ucbVideo struct { type ucbVideo struct {
id string id string
title string title string
channel string channel string
description string description string
publishedAt time.Time publishedAt time.Time
dir string dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
} }
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo { func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
@ -183,14 +187,16 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f
ChangeAddress: &claimAddress, ChangeAddress: &claimAddress,
} }
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
} }
func (v *ucbVideo) Size() *int64 { func (v *ucbVideo) Size() *int64 {
return nil return nil
} }
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {

View file

@ -11,6 +11,8 @@ import (
"strings" "strings"
"time" "time"
"sync"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
@ -28,6 +30,8 @@ type YoutubeVideo struct {
size *int64 size *int64
publishedAt time.Time publishedAt time.Time
dir string dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
} }
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo { func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
@ -201,14 +205,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
ChangeAddress: &claimAddress, ChangeAddress: &claimAddress,
ChannelID: &channelID, ChannelID: &channelID,
} }
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
} }
func (v *YoutubeVideo) Size() *int64 { func (v *YoutubeVideo) Size() *int64 {
return v.size return v.size
} }
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int) (*SyncSummary, error) { func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {

View file

@ -47,7 +47,7 @@ type video interface {
IDAndNum() string IDAndNum() string
PlaylistPosition() int PlaylistPosition() int
PublishedAt() time.Time PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int) (*sources.SyncSummary, error) Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
} }
// sorting videos // sorting videos
@ -78,8 +78,9 @@ type Sync struct {
claimAddress string claimAddress string
videoDirectory string videoDirectory string
db *redisdb.DB db *redisdb.DB
syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo syncedVideos map[string]syncedVideo
syncedVideosMux *sync.Mutex claimNames map[string]bool
grp *stop.Group grp *stop.Group
lbryChannelID string lbryChannelID string
@ -87,7 +88,7 @@ type Sync struct {
queue chan video queue chan video
} }
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string) { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock() defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{ s.syncedVideos[videoID] = syncedVideo{
@ -95,6 +96,7 @@ func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason s
Published: published, Published: published,
FailureReason: failureReason, FailureReason: failureReason,
} }
s.claimNames[claimName] = true
} }
// SendErrorToSlack Sends an error message to the default channel and to the process log. // SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -225,7 +227,7 @@ func (s *Sync) FullCycle() (e error) {
if s.YoutubeChannelID == "" { if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided") return errors.Err("channel ID not provided")
} }
s.syncedVideosMux = &sync.Mutex{} s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{} s.walletMux = &sync.Mutex{}
s.db = redisdb.New() s.db = redisdb.New()
s.grp = stop.New() s.grp = stop.New()
@ -238,12 +240,13 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop() s.grp.Stop()
}() }()
syncedVideos, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil { if err != nil {
return err return err
} }
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock() s.syncedVideosMux.Unlock()
defer s.updateChannelStatus(&e) defer s.updateChannelStatus(&e)
@ -301,14 +304,14 @@ func (s *Sync) updateChannelStatus(e *error) {
return return
} }
failureReason := (*e).Error() failureReason := (*e).Error()
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err) err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e) *e = errors.Prefix(err.Error(), *e)
} }
} else if !s.IsInterrupted() { } else if !s.IsInterrupted() {
_, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") _, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil { if err != nil {
*e = err *e = err
} }
@ -472,7 +475,7 @@ func (s *Sync) startWorker(workerNum int) {
} }
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
} }
s.AppendSyncedVideo(v.ID(), false, err.Error()) s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
@ -630,9 +633,9 @@ func (s *Sync) processVideo(v video) (err error) {
log.Println(v.ID() + " took " + time.Since(start).String()) log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now()) }(time.Now())
s.syncedVideosMux.Lock() s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()] sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.Unlock() s.syncedVideosMux.RUnlock()
alreadyPublished := ok && sv.Published alreadyPublished := ok && sv.Published
neverRetryFailures := []string{ neverRetryFailures := []string{
@ -670,7 +673,7 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil { if err != nil {
return err return err
} }
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize) summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
if err != nil { if err != nil {
return err return err
} }
@ -679,7 +682,7 @@ func (s *Sync) processVideo(v video) (err error) {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
s.AppendSyncedVideo(v.ID(), true, "") s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
return nil return nil
} }