grin/niko coworking changes

This commit is contained in:
Alex Grintsvayg 2018-09-18 15:20:34 -04:00 committed by Niko Storni
parent 363726659f
commit 5ec5412191
6 changed files with 113 additions and 84 deletions

View file

@ -35,6 +35,13 @@ type SyncManager struct {
SingleRun bool
ChannelProperties *sdk.ChannelProperties
APIConfig *sdk.APIConfig
namer *Namer
}
func NewSyncManager() *SyncManager {
return &SyncManager{
namer: NewNamer(),
}
}
const (
@ -85,6 +92,11 @@ const (
)
func (s *SyncManager) Start() error {
if s.namer == nil {
// TODO: fix me, use NewSyncManager instead
s.namer = NewNamer()
}
syncCount := 0
for {
err := s.checkUsedSpace()
@ -121,6 +133,7 @@ func (s *SyncManager) Start() error {
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
namer: s.namer,
}
shouldInterruptLoop = true
} else {

78
names.go Normal file
View file

@ -0,0 +1,78 @@
package ytsync
import (
"crypto/md5"
"encoding/hex"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
)
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
type Namer struct {
mu *sync.Mutex
names map[string]bool
}
func NewNamer() *Namer {
return &Namer{
mu: &sync.Mutex{},
names: make(map[string]bool),
}
}
func (n *Namer) GetNextName(prefix string) string {
n.mu.Lock()
defer n.mu.Unlock()
attempt := 1
var name string
for {
name = getClaimNameFromTitle(prefix, attempt)
if _, exists := n.names[name]; !exists {
break
}
attempt++
}
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
name = fmt.Sprintf("%s-%d", hex.EncodeToString(md5.Sum([]byte(prefix))[:])[:15], attempt)
}
n.names[name] = true
return name
}
// TODO: clean this up some
func getClaimNameFromTitle(title string, attempt int) string {
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}

View file

@ -1,90 +1,27 @@
package sources
import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"crypto/md5"
"encoding/hex"
"github.com/lbryio/lbry.go/jsonrpc"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/ytsync"
)
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
type SyncSummary struct {
ClaimID string
ClaimName string
}
func getClaimNameFromTitle(title string, attempt int) string {
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
attempt := 0
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *ytsync.Namer) (*SyncSummary, error) {
for {
attempt++
name := getClaimNameFromTitle(title, attempt)
syncedVideosMux.Lock()
_, exists := claimNames[name]
if exists {
log.Printf("name exists, retrying (%d attempts so far)", attempt)
syncedVideosMux.Unlock()
continue
}
claimNames[name] = false
syncedVideosMux.Unlock()
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
hasher := md5.New()
hasher.Write([]byte(title))
name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt)
}
name := namer.GetNextName(title)
response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
syncedVideosMux.Lock()
claimNames[name] = true
syncedVideosMux.Unlock()
if err == nil {
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else {
log.Printf("name exists, retrying (%d attempts so far)", attempt)
if err != nil {
if strings.Contains(err.Error(), "failed: Multiple claims (") {
continue
}
} else {
return nil, err
}
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
}
}

View file

@ -6,15 +6,15 @@ import (
"regexp"
"strconv"
"strings"
"time"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/ytsync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
@ -174,7 +174,7 @@ func (v *ucbVideo) saveThumbnail() error {
return err
}
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *ytsync.Namer) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: strPtr("UC Berkeley"),
@ -187,7 +187,7 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f
ChangeAddress: &claimAddress,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
}
func (v *ucbVideo) Size() *int64 {

View file

@ -9,12 +9,12 @@ import (
"regexp"
"strconv"
"strings"
"time"
"sync"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/ytsync"
"github.com/rylio/ytdl"
log "github.com/sirupsen/logrus"
@ -234,7 +234,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s }
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *ytsync.Namer) (*SyncSummary, error) {
if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
}
@ -249,18 +249,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
ChangeAddress: &claimAddress,
ChannelID: &channelID,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
}
func (v *YoutubeVideo) Size() *int64 {
return v.size
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *ytsync.Namer) (*SyncSummary, error) {
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
@ -274,7 +272,7 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
}
log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID)
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
//delete the video in all cases (and ignore the error)
_ = v.delete()
if err != nil {

View file

@ -46,7 +46,7 @@ type video interface {
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
Sync(*jsonrpc.Client, string, float64, string, int, *Namer) (*sources.SyncSummary, error)
}
// sorting videos
@ -81,6 +81,7 @@ type Sync struct {
claimNames map[string]bool
grp *stop.Group
lbryChannelID string
namer *Namer
walletMux *sync.Mutex
queue chan video
@ -778,10 +779,12 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil {
return err
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.namer)
if err != nil {
return err
}
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())