From 5ec5412191e7f3659514de241525aa21108f01b6 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 18 Sep 2018 15:20:34 -0400 Subject: [PATCH] grin/niko coworking changes --- manager.go | 13 +++++++ names.go | 78 +++++++++++++++++++++++++++++++++++++++++ sources/shared.go | 75 ++++----------------------------------- sources/ucbVideo.go | 8 ++--- sources/youtubeVideo.go | 16 ++++----- ytsync.go | 7 ++-- 6 files changed, 113 insertions(+), 84 deletions(-) create mode 100644 names.go diff --git a/manager.go b/manager.go index d59df1e..1fb2f68 100644 --- a/manager.go +++ b/manager.go @@ -35,6 +35,13 @@ type SyncManager struct { SingleRun bool ChannelProperties *sdk.ChannelProperties APIConfig *sdk.APIConfig + namer *Namer +} + +func NewSyncManager() *SyncManager { + return &SyncManager{ + namer: NewNamer(), + } } const ( @@ -85,6 +92,11 @@ const ( ) func (s *SyncManager) Start() error { + if s.namer == nil { + // TODO: fix me, use NewSyncManager instead + s.namer = NewNamer() + } + syncCount := 0 for { err := s.checkUsedSpace() @@ -121,6 +133,7 @@ func (s *SyncManager) Start() error { AwsS3Secret: s.AwsS3Secret, AwsS3Region: s.AwsS3Region, AwsS3Bucket: s.AwsS3Bucket, + namer: s.namer, } shouldInterruptLoop = true } else { diff --git a/names.go b/names.go new file mode 100644 index 0000000..a9a397c --- /dev/null +++ b/names.go @@ -0,0 +1,78 @@ +package ytsync + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "regexp" + "strconv" + "strings" + "sync" +) + +var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`) + +type Namer struct { + mu *sync.Mutex + names map[string]bool +} + +func NewNamer() *Namer { + return &Namer{ + mu: &sync.Mutex{}, + names: make(map[string]bool), + } +} + +func (n *Namer) GetNextName(prefix string) string { + n.mu.Lock() + defer n.mu.Unlock() + + attempt := 1 + var name string + for { + name = getClaimNameFromTitle(prefix, attempt) + if _, exists := n.names[name]; !exists { + break + } + attempt++ + } + + //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash + if len(name) < 2 { + name = fmt.Sprintf("%s-%d", hex.EncodeToString(md5.Sum([]byte(prefix))[:])[:15], attempt) + } + + n.names[name] = true + + return name +} + +// TODO: clean this up some +func getClaimNameFromTitle(title string, attempt int) string { + suffix := "" + if attempt > 1 { + suffix = "-" + strconv.Itoa(attempt) + } + maxLen := 40 - len(suffix) + + chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-") + + name := chunks[0] + if len(name) > maxLen { + return name[:maxLen] + } + + for _, chunk := range chunks[1:] { + tmpName := name + "-" + chunk + if len(tmpName) > maxLen { + if len(name) < 20 { + name = tmpName[:maxLen] + } + break + } + name = tmpName + } + + return name + suffix +} diff --git a/sources/shared.go b/sources/shared.go index ba5be9e..bda462f 100644 --- a/sources/shared.go +++ b/sources/shared.go @@ -1,90 +1,27 @@ package sources import ( - "fmt" - "regexp" - "strconv" "strings" - "sync" - - "crypto/md5" - "encoding/hex" "github.com/lbryio/lbry.go/jsonrpc" - log "github.com/sirupsen/logrus" + "github.com/lbryio/lbry.go/ytsync" ) -var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`) - type SyncSummary struct { ClaimID string ClaimName string } -func getClaimNameFromTitle(title string, attempt int) string { - suffix := "" - if attempt > 1 { - suffix = "-" + strconv.Itoa(attempt) - } - maxLen := 40 - len(suffix) - - chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-") - - name := chunks[0] - if len(name) > maxLen { - return name[:maxLen] - } - - for _, chunk := range chunks[1:] { - tmpName := name + "-" + chunk - if len(tmpName) > maxLen { - if len(name) < 20 { - name = tmpName[:maxLen] - } - break - } - name = tmpName - } - - return name + suffix -} - -func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { - attempt := 0 +func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *ytsync.Namer) (*SyncSummary, error) { for { - attempt++ - name := getClaimNameFromTitle(title, attempt) - - syncedVideosMux.Lock() - _, exists := claimNames[name] - if exists { - log.Printf("name exists, retrying (%d attempts so far)", attempt) - syncedVideosMux.Unlock() - continue - } - claimNames[name] = false - syncedVideosMux.Unlock() - - //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash - if len(name) < 2 { - hasher := md5.New() - hasher.Write([]byte(title)) - name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt) - } - + name := namer.GetNextName(title) response, err := daemon.Publish(name, filename, amount, options) - if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { - syncedVideosMux.Lock() - claimNames[name] = true - syncedVideosMux.Unlock() - if err == nil { - return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil - } else { - log.Printf("name exists, retrying (%d attempts so far)", attempt) + if err != nil { + if strings.Contains(err.Error(), "failed: Multiple claims (") { continue } - } else { return nil, err } + return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil } } diff --git a/sources/ucbVideo.go b/sources/ucbVideo.go index 3819033..33578bd 100644 --- a/sources/ucbVideo.go +++ b/sources/ucbVideo.go @@ -6,15 +6,15 @@ import ( "regexp" "strconv" "strings" - "time" - "sync" + "time" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/ytsync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -174,7 +174,7 @@ func (v *ucbVideo) saveThumbnail() error { return err } -func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *ytsync.Namer) (*SyncSummary, error) { options := jsonrpc.PublishOptions{ Title: &v.title, Author: strPtr("UC Berkeley"), @@ -187,7 +187,7 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f ChangeAddress: &claimAddress, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer) } func (v *ucbVideo) Size() *int64 { diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index d67587d..0fc9b96 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -9,12 +9,12 @@ import ( "regexp" "strconv" "strings" - "time" - "sync" + "time" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" + "github.com/lbryio/lbry.go/ytsync" "github.com/rylio/ytdl" log "github.com/sirupsen/logrus" @@ -234,7 +234,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() error { func strPtr(s string) *string { return &s } -func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { +func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *ytsync.Namer) (*SyncSummary, error) { if channelID == "" { return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? } @@ -249,18 +249,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou ChangeAddress: &claimAddress, ChannelID: &channelID, } - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) + + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer) } func (v *YoutubeVideo) Size() *int64 { return v.size } -func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { - v.claimNames = claimNames - v.syncedVideosMux = syncedVideosMux +func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *ytsync.Namer) (*SyncSummary, error) { v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024 - //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -274,7 +272,7 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount } log.Debugln("Created thumbnail for " + v.id) - summary, err := v.publish(daemon, claimAddress, amount, channelID) + summary, err := v.publish(daemon, claimAddress, amount, channelID, namer) //delete the video in all cases (and ignore the error) _ = v.delete() if err != nil { diff --git a/ytsync.go b/ytsync.go index 59c3864..3ef05b7 100644 --- a/ytsync.go +++ b/ytsync.go @@ -46,7 +46,7 @@ type video interface { IDAndNum() string PlaylistPosition() int PublishedAt() time.Time - Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error) + Sync(*jsonrpc.Client, string, float64, string, int, *Namer) (*sources.SyncSummary, error) } // sorting videos @@ -81,6 +81,7 @@ type Sync struct { claimNames map[string]bool grp *stop.Group lbryChannelID string + namer *Namer walletMux *sync.Mutex queue chan video @@ -778,10 +779,12 @@ func (s *Sync) processVideo(v video) (err error) { if err != nil { return err } - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux) + + summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.namer) if err != nil { return err } + err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) if err != nil { SendErrorToSlack("Failed to mark video on the database: %s", err.Error())