diff --git a/setup.go b/setup.go index b55a04e..847dc52 100644 --- a/setup.go +++ b/setup.go @@ -27,9 +27,14 @@ func (s *Sync) walletSetup() error { balance := decimal.Decimal(*balanceResp) log.Debugf("Starting balance is %s", balance.String()) - numOnSource, err := s.CountVideos() - if err != nil { - return err + var numOnSource uint64 + if s.LbryChannelName == "@UCBerkeley" { + numOnSource = 10104 + } else { + numOnSource, err = s.CountVideos() + if err != nil { + return err + } } log.Debugf("Source channel has %d videos", numOnSource) diff --git a/sources/shared.go b/sources/shared.go new file mode 100644 index 0000000..f70ebbc --- /dev/null +++ b/sources/shared.go @@ -0,0 +1,75 @@ +package sources + +import ( + "regexp" + "strconv" + "strings" + "sync" + + "github.com/lbryio/lbry.go/jsonrpc" + log "github.com/sirupsen/logrus" +) + +var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`) + +func getClaimNameFromTitle(title string, attempt int) string { + suffix := "" + if attempt > 1 { + suffix = "-" + strconv.Itoa(attempt) + } + maxLen := 40 - len(suffix) + + chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-") + + name := chunks[0] + if len(name) > maxLen { + return name[:maxLen] + } + + for _, chunk := range chunks[1:] { + tmpName := name + "-" + chunk + if len(tmpName) > maxLen { + if len(name) < 20 { + name = tmpName[:maxLen] + } + break + } + name = tmpName + } + + return name + suffix +} + +var publishedNamesMutex sync.RWMutex +var publishedNames = map[string]bool{} + +func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) error { + attempt := 0 + for { + attempt++ + name := getClaimNameFromTitle(title, attempt) + + publishedNamesMutex.RLock() + _, exists := publishedNames[name] + publishedNamesMutex.RUnlock() + if exists { + log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + continue + } + + _, err := daemon.Publish(name, filename, amount, options) + if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { + publishedNamesMutex.Lock() + publishedNames[name] = true + publishedNamesMutex.Unlock() + if err == nil { + return nil + } else { + log.Printf("name exists, retrying (%d attempts so far)\n", attempt) + continue + } + } else { + return err + } + } +} diff --git a/sources/ucbVideo.go b/sources/ucbVideo.go new file mode 100644 index 0000000..366d39c --- /dev/null +++ b/sources/ucbVideo.go @@ -0,0 +1,207 @@ +package sources + +import ( + "net/http" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/lbryio/lbry.go/jsonrpc" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/go-errors/errors" + log "github.com/sirupsen/logrus" +) + +type ucbVideo struct { + id string + title string + channel string + description string + publishedAt time.Time + dir string +} + +func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo { + p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors + return ucbVideo{ + id: id, + title: title, + description: description, + channel: channel, + dir: dir, + publishedAt: p, + } +} + +func (v ucbVideo) ID() string { + return v.id +} + +func (v ucbVideo) IDAndNum() string { + return v.ID() + " (?)" +} + +func (v ucbVideo) PublishedAt() time.Time { + return v.publishedAt + //r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`) + //matches := r.FindStringSubmatch(v.title) + //if len(matches) > 0 { + // year, _ := strconv.Atoi(matches[1]) + // month, _ := strconv.Atoi(matches[2]) + // day, _ := strconv.Atoi(matches[3]) + // return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC) + //} + //return time.Now() +} + +func (v ucbVideo) getFilename() string { + return v.dir + "/" + v.id + ".mp4" +} + +func (v ucbVideo) getClaimName(attempt int) string { + reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) + suffix := "" + if attempt > 1 { + suffix = "-" + strconv.Itoa(attempt) + } + maxLen := 40 - len(suffix) + + chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-") + + name := chunks[0] + if len(name) > maxLen { + return name[:maxLen] + } + + for _, chunk := range chunks[1:] { + tmpName := name + "-" + chunk + if len(tmpName) > maxLen { + if len(name) < 20 { + name = tmpName[:maxLen] + } + break + } + name = tmpName + } + + return name + suffix +} + +func (v ucbVideo) getAbbrevDescription() string { + maxLines := 10 + description := strings.TrimSpace(v.description) + if strings.Count(description, "\n") < maxLines { + return description + } + return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." +} + +func (v ucbVideo) download() error { + videoPath := v.getFilename() + + _, err := os.Stat(videoPath) + if err != nil && !os.IsNotExist(err) { + return err + } else if err == nil { + log.Debugln(v.id + " already exists at " + videoPath) + return nil + } + + creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "") + s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds}) + if err != nil { + return err + } + downloader := s3manager.NewDownloader(s) + + out, err := os.Create(videoPath) + if err != nil { + return err + } + defer out.Close() + + log.Println("lbry-niko2/videos/" + v.channel + "/" + v.id) + + bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ + Bucket: aws.String("lbry-niko2"), + Key: aws.String("/videos/" + v.channel + "/" + v.id + ".mp4"), + }) + if err != nil { + return err + } else if bytesWritten == 0 { + return errors.New("zero bytes written") + } + + return nil +} + +func (v ucbVideo) saveThumbnail() error { + resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id) + if err != nil { + return err + } + defer resp.Body.Close() + + creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "") + s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds}) + if err != nil { + return err + } + uploader := s3manager.NewUploader(s) + + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String("berk.ninja"), + Key: aws.String("thumbnails/" + v.id), + ContentType: aws.String("image/jpeg"), + Body: resp.Body, + }) + + return err +} + +func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { + options := jsonrpc.PublishOptions{ + Title: &v.title, + Author: strPtr("UC Berkeley"), + Description: strPtr(v.getAbbrevDescription()), + Language: strPtr("en"), + ClaimAddress: &claimAddress, + Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), + License: strPtr("see description"), + } + + if channelName != "" { + options.ChannelName = &channelName + } + + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) +} + +func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { + //download and thumbnail can be done in parallel + err := v.download() + if err != nil { + return errors.WrapPrefix(err, "download error", 0) + } + log.Debugln("Downloaded " + v.id) + + //err = v.SaveThumbnail() + //if err != nil { + // return errors.WrapPrefix(err, "thumbnail error", 0) + //} + //log.Debugln("Created thumbnail for " + v.id) + + err = v.publish(daemon, claimAddress, amount, channelName) + if err != nil { + return errors.WrapPrefix(err, "publish error", 0) + } + + return nil +} diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 88e19c9..b7240e8 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -58,7 +58,7 @@ func (v YoutubeVideo) getFilename() string { return v.dir + "/" + v.id + ".mp4" } -func (v YoutubeVideo) getClaimName() string { +func (v YoutubeVideo) getClaimName(attempt int) string { maxLen := 40 reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) @@ -92,7 +92,7 @@ func (v YoutubeVideo) getAbbrevDescription() string { return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." } -func (v YoutubeVideo) Download() error { +func (v YoutubeVideo) download() error { videoPath := v.getFilename() _, err := os.Stat(videoPath) @@ -115,7 +115,7 @@ func (v YoutubeVideo) Download() error { return nil } -func (v YoutubeVideo) TriggerThumbnailSave() error { +func (v YoutubeVideo) triggerThumbnailSave() error { client := &http.Client{Timeout: 30 * time.Second} params, err := json.Marshal(map[string]string{"videoid": v.id}) @@ -158,39 +158,38 @@ func (v YoutubeVideo) TriggerThumbnailSave() error { func strPtr(s string) *string { return &s } -func (v YoutubeVideo) Publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { +func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { options := jsonrpc.PublishOptions{ Title: &v.title, Author: &v.channelTitle, Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id), Language: strPtr("en"), ClaimAddress: &claimAddress, - Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id), + Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), License: strPtr("Copyrighted (contact author)"), } if channelName != "" { options.ChannelName = &channelName } - _, err := daemon.Publish(v.getClaimName(), v.getFilename(), amount, options) - return err + return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) } func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { //download and thumbnail can be done in parallel - err := v.Download() + err := v.download() if err != nil { return errors.WrapPrefix(err, "download error", 0) } log.Debugln("Downloaded " + v.id) - err = v.TriggerThumbnailSave() + err = v.triggerThumbnailSave() if err != nil { return errors.WrapPrefix(err, "thumbnail error", 0) } log.Debugln("Created thumbnail for " + v.id) - err = v.Publish(daemon, claimAddress, amount, channelName) + err = v.publish(daemon, claimAddress, amount, channelName) if err != nil { return errors.WrapPrefix(err, "publish error", 0) } diff --git a/ytsync.go b/ytsync.go index f4c2a8f..635db20 100644 --- a/ytsync.go +++ b/ytsync.go @@ -1,7 +1,10 @@ package ytsync import ( + "bufio" + "encoding/csv" "encoding/json" + "io" "io/ioutil" "net/http" "os" @@ -81,11 +84,11 @@ func (s *Sync) FullCycle() error { defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) - if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) { - if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { - return errors.New("Tried to continue previous upload, but default_wallet already exists") - } + if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { + return errors.New("default_wallet already exists") + } + if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) { err = os.Rename(walletBackupDir, defaultWalletDir) if err != nil { return errors.Wrap(err, 0) @@ -162,7 +165,11 @@ func (s *Sync) doSync() error { go s.startWorker(i) } - err = s.enqueueVideos() + if s.LbryChannelName == "@UCBerkeley" { + err = s.enqueueUCBVideos() + } else { + err = s.enqueueYoutubeVideos() + } close(s.queue) s.wg.Wait() return err @@ -193,7 +200,7 @@ func (s *Sync) startWorker(workerNum int) { return } - log.Println("========================================") + log.Println("================================================================================") tryCount := 0 for { @@ -209,6 +216,7 @@ func (s *Sync) startWorker(workerNum int) { strings.Contains(err.Error(), " reason: 'This video contains content from") || strings.Contains(err.Error(), "dont know which claim to update") || strings.Contains(err.Error(), "uploader has not made this video available in your country") || + strings.Contains(err.Error(), "download error: AccessDenied: Access Denied") || strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") { log.Println("This error should not be retried at all") } else if tryCount >= s.MaxTries { @@ -225,7 +233,7 @@ func (s *Sync) startWorker(workerNum int) { } } -func (s *Sync) enqueueVideos() error { +func (s *Sync) enqueueYoutubeVideos() error { client := &http.Client{ Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, } @@ -308,6 +316,55 @@ Enqueue: return nil } +func (s *Sync) enqueueUCBVideos() error { + var videos []video + + csvFile, err := os.Open("ucb.csv") + if err != nil { + return err + } + + reader := csv.NewReader(bufio.NewReader(csvFile)) + for { + line, err := reader.Read() + if err == io.EOF { + break + } else if err != nil { + return err + } + data := struct { + PublishedAt string `json:"publishedAt"` + }{} + err = json.Unmarshal([]byte(line[4]), &data) + if err != nil { + return err + } + + videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory)) + } + + log.Printf("Publishing %d videos\n", len(videos)) + + sort.Sort(byPublishedAt(videos)) + +Enqueue: + for _, v := range videos { + select { + case <-s.stop.Chan(): + break Enqueue + default: + } + + select { + case s.queue <- v: + case <-s.stop.Chan(): + break Enqueue + } + } + + return nil +} + func (s *Sync) processVideo(v video) error { log.Println("Processing " + v.IDAndNum()) defer func(start time.Time) {