ucb
This commit is contained in:
parent
261dbed101
commit
2cd8552702
5 changed files with 363 additions and 20 deletions
11
setup.go
11
setup.go
|
@ -27,9 +27,14 @@ func (s *Sync) walletSetup() error {
|
||||||
balance := decimal.Decimal(*balanceResp)
|
balance := decimal.Decimal(*balanceResp)
|
||||||
log.Debugf("Starting balance is %s", balance.String())
|
log.Debugf("Starting balance is %s", balance.String())
|
||||||
|
|
||||||
numOnSource, err := s.CountVideos()
|
var numOnSource uint64
|
||||||
if err != nil {
|
if s.LbryChannelName == "@UCBerkeley" {
|
||||||
return err
|
numOnSource = 10104
|
||||||
|
} else {
|
||||||
|
numOnSource, err = s.CountVideos()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
log.Debugf("Source channel has %d videos", numOnSource)
|
log.Debugf("Source channel has %d videos", numOnSource)
|
||||||
|
|
||||||
|
|
75
sources/shared.go
Normal file
75
sources/shared.go
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
package sources
|
||||||
|
|
||||||
|
import (
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||||
|
|
||||||
|
func getClaimNameFromTitle(title string, attempt int) string {
|
||||||
|
suffix := ""
|
||||||
|
if attempt > 1 {
|
||||||
|
suffix = "-" + strconv.Itoa(attempt)
|
||||||
|
}
|
||||||
|
maxLen := 40 - len(suffix)
|
||||||
|
|
||||||
|
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
|
||||||
|
|
||||||
|
name := chunks[0]
|
||||||
|
if len(name) > maxLen {
|
||||||
|
return name[:maxLen]
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chunk := range chunks[1:] {
|
||||||
|
tmpName := name + "-" + chunk
|
||||||
|
if len(tmpName) > maxLen {
|
||||||
|
if len(name) < 20 {
|
||||||
|
name = tmpName[:maxLen]
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
name = tmpName
|
||||||
|
}
|
||||||
|
|
||||||
|
return name + suffix
|
||||||
|
}
|
||||||
|
|
||||||
|
var publishedNamesMutex sync.RWMutex
|
||||||
|
var publishedNames = map[string]bool{}
|
||||||
|
|
||||||
|
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) error {
|
||||||
|
attempt := 0
|
||||||
|
for {
|
||||||
|
attempt++
|
||||||
|
name := getClaimNameFromTitle(title, attempt)
|
||||||
|
|
||||||
|
publishedNamesMutex.RLock()
|
||||||
|
_, exists := publishedNames[name]
|
||||||
|
publishedNamesMutex.RUnlock()
|
||||||
|
if exists {
|
||||||
|
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := daemon.Publish(name, filename, amount, options)
|
||||||
|
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
||||||
|
publishedNamesMutex.Lock()
|
||||||
|
publishedNames[name] = true
|
||||||
|
publishedNamesMutex.Unlock()
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
207
sources/ucbVideo.go
Normal file
207
sources/ucbVideo.go
Normal file
|
@ -0,0 +1,207 @@
|
||||||
|
package sources
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/go-errors/errors"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ucbVideo struct {
|
||||||
|
id string
|
||||||
|
title string
|
||||||
|
channel string
|
||||||
|
description string
|
||||||
|
publishedAt time.Time
|
||||||
|
dir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) ucbVideo {
|
||||||
|
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
|
||||||
|
return ucbVideo{
|
||||||
|
id: id,
|
||||||
|
title: title,
|
||||||
|
description: description,
|
||||||
|
channel: channel,
|
||||||
|
dir: dir,
|
||||||
|
publishedAt: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) ID() string {
|
||||||
|
return v.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) IDAndNum() string {
|
||||||
|
return v.ID() + " (?)"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) PublishedAt() time.Time {
|
||||||
|
return v.publishedAt
|
||||||
|
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
|
||||||
|
//matches := r.FindStringSubmatch(v.title)
|
||||||
|
//if len(matches) > 0 {
|
||||||
|
// year, _ := strconv.Atoi(matches[1])
|
||||||
|
// month, _ := strconv.Atoi(matches[2])
|
||||||
|
// day, _ := strconv.Atoi(matches[3])
|
||||||
|
// return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC)
|
||||||
|
//}
|
||||||
|
//return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) getFilename() string {
|
||||||
|
return v.dir + "/" + v.id + ".mp4"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) getClaimName(attempt int) string {
|
||||||
|
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||||
|
suffix := ""
|
||||||
|
if attempt > 1 {
|
||||||
|
suffix = "-" + strconv.Itoa(attempt)
|
||||||
|
}
|
||||||
|
maxLen := 40 - len(suffix)
|
||||||
|
|
||||||
|
chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-")
|
||||||
|
|
||||||
|
name := chunks[0]
|
||||||
|
if len(name) > maxLen {
|
||||||
|
return name[:maxLen]
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chunk := range chunks[1:] {
|
||||||
|
tmpName := name + "-" + chunk
|
||||||
|
if len(tmpName) > maxLen {
|
||||||
|
if len(name) < 20 {
|
||||||
|
name = tmpName[:maxLen]
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
name = tmpName
|
||||||
|
}
|
||||||
|
|
||||||
|
return name + suffix
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) getAbbrevDescription() string {
|
||||||
|
maxLines := 10
|
||||||
|
description := strings.TrimSpace(v.description)
|
||||||
|
if strings.Count(description, "\n") < maxLines {
|
||||||
|
return description
|
||||||
|
}
|
||||||
|
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) download() error {
|
||||||
|
videoPath := v.getFilename()
|
||||||
|
|
||||||
|
_, err := os.Stat(videoPath)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return err
|
||||||
|
} else if err == nil {
|
||||||
|
log.Debugln(v.id + " already exists at " + videoPath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
|
||||||
|
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
downloader := s3manager.NewDownloader(s)
|
||||||
|
|
||||||
|
out, err := os.Create(videoPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer out.Close()
|
||||||
|
|
||||||
|
log.Println("lbry-niko2/videos/" + v.channel + "/" + v.id)
|
||||||
|
|
||||||
|
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
|
||||||
|
Bucket: aws.String("lbry-niko2"),
|
||||||
|
Key: aws.String("/videos/" + v.channel + "/" + v.id + ".mp4"),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if bytesWritten == 0 {
|
||||||
|
return errors.New("zero bytes written")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) saveThumbnail() error {
|
||||||
|
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
|
||||||
|
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
uploader := s3manager.NewUploader(s)
|
||||||
|
|
||||||
|
_, err = uploader.Upload(&s3manager.UploadInput{
|
||||||
|
Bucket: aws.String("berk.ninja"),
|
||||||
|
Key: aws.String("thumbnails/" + v.id),
|
||||||
|
ContentType: aws.String("image/jpeg"),
|
||||||
|
Body: resp.Body,
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
|
||||||
|
options := jsonrpc.PublishOptions{
|
||||||
|
Title: &v.title,
|
||||||
|
Author: strPtr("UC Berkeley"),
|
||||||
|
Description: strPtr(v.getAbbrevDescription()),
|
||||||
|
Language: strPtr("en"),
|
||||||
|
ClaimAddress: &claimAddress,
|
||||||
|
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
|
||||||
|
License: strPtr("see description"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if channelName != "" {
|
||||||
|
options.ChannelName = &channelName
|
||||||
|
}
|
||||||
|
|
||||||
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
|
||||||
|
//download and thumbnail can be done in parallel
|
||||||
|
err := v.download()
|
||||||
|
if err != nil {
|
||||||
|
return errors.WrapPrefix(err, "download error", 0)
|
||||||
|
}
|
||||||
|
log.Debugln("Downloaded " + v.id)
|
||||||
|
|
||||||
|
//err = v.SaveThumbnail()
|
||||||
|
//if err != nil {
|
||||||
|
// return errors.WrapPrefix(err, "thumbnail error", 0)
|
||||||
|
//}
|
||||||
|
//log.Debugln("Created thumbnail for " + v.id)
|
||||||
|
|
||||||
|
err = v.publish(daemon, claimAddress, amount, channelName)
|
||||||
|
if err != nil {
|
||||||
|
return errors.WrapPrefix(err, "publish error", 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -58,7 +58,7 @@ func (v YoutubeVideo) getFilename() string {
|
||||||
return v.dir + "/" + v.id + ".mp4"
|
return v.dir + "/" + v.id + ".mp4"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) getClaimName() string {
|
func (v YoutubeVideo) getClaimName(attempt int) string {
|
||||||
maxLen := 40
|
maxLen := 40
|
||||||
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ func (v YoutubeVideo) getAbbrevDescription() string {
|
||||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) Download() error {
|
func (v YoutubeVideo) download() error {
|
||||||
videoPath := v.getFilename()
|
videoPath := v.getFilename()
|
||||||
|
|
||||||
_, err := os.Stat(videoPath)
|
_, err := os.Stat(videoPath)
|
||||||
|
@ -115,7 +115,7 @@ func (v YoutubeVideo) Download() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) TriggerThumbnailSave() error {
|
func (v YoutubeVideo) triggerThumbnailSave() error {
|
||||||
client := &http.Client{Timeout: 30 * time.Second}
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
|
||||||
params, err := json.Marshal(map[string]string{"videoid": v.id})
|
params, err := json.Marshal(map[string]string{"videoid": v.id})
|
||||||
|
@ -158,39 +158,38 @@ func (v YoutubeVideo) TriggerThumbnailSave() error {
|
||||||
|
|
||||||
func strPtr(s string) *string { return &s }
|
func strPtr(s string) *string { return &s }
|
||||||
|
|
||||||
func (v YoutubeVideo) Publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
|
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
|
||||||
options := jsonrpc.PublishOptions{
|
options := jsonrpc.PublishOptions{
|
||||||
Title: &v.title,
|
Title: &v.title,
|
||||||
Author: &v.channelTitle,
|
Author: &v.channelTitle,
|
||||||
Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id),
|
Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id),
|
||||||
Language: strPtr("en"),
|
Language: strPtr("en"),
|
||||||
ClaimAddress: &claimAddress,
|
ClaimAddress: &claimAddress,
|
||||||
Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id),
|
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
|
||||||
License: strPtr("Copyrighted (contact author)"),
|
License: strPtr("Copyrighted (contact author)"),
|
||||||
}
|
}
|
||||||
if channelName != "" {
|
if channelName != "" {
|
||||||
options.ChannelName = &channelName
|
options.ChannelName = &channelName
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := daemon.Publish(v.getClaimName(), v.getFilename(), amount, options)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
|
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.Download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WrapPrefix(err, "download error", 0)
|
return errors.WrapPrefix(err, "download error", 0)
|
||||||
}
|
}
|
||||||
log.Debugln("Downloaded " + v.id)
|
log.Debugln("Downloaded " + v.id)
|
||||||
|
|
||||||
err = v.TriggerThumbnailSave()
|
err = v.triggerThumbnailSave()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WrapPrefix(err, "thumbnail error", 0)
|
return errors.WrapPrefix(err, "thumbnail error", 0)
|
||||||
}
|
}
|
||||||
log.Debugln("Created thumbnail for " + v.id)
|
log.Debugln("Created thumbnail for " + v.id)
|
||||||
|
|
||||||
err = v.Publish(daemon, claimAddress, amount, channelName)
|
err = v.publish(daemon, claimAddress, amount, channelName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.WrapPrefix(err, "publish error", 0)
|
return errors.WrapPrefix(err, "publish error", 0)
|
||||||
}
|
}
|
||||||
|
|
71
ytsync.go
71
ytsync.go
|
@ -1,7 +1,10 @@
|
||||||
package ytsync
|
package ytsync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/csv"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -81,11 +84,11 @@ func (s *Sync) FullCycle() error {
|
||||||
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
|
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
|
||||||
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
|
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
|
||||||
|
|
||||||
if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) {
|
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
|
||||||
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
|
return errors.New("default_wallet already exists")
|
||||||
return errors.New("Tried to continue previous upload, but default_wallet already exists")
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) {
|
||||||
err = os.Rename(walletBackupDir, defaultWalletDir)
|
err = os.Rename(walletBackupDir, defaultWalletDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, 0)
|
return errors.Wrap(err, 0)
|
||||||
|
@ -162,7 +165,11 @@ func (s *Sync) doSync() error {
|
||||||
go s.startWorker(i)
|
go s.startWorker(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.enqueueVideos()
|
if s.LbryChannelName == "@UCBerkeley" {
|
||||||
|
err = s.enqueueUCBVideos()
|
||||||
|
} else {
|
||||||
|
err = s.enqueueYoutubeVideos()
|
||||||
|
}
|
||||||
close(s.queue)
|
close(s.queue)
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
return err
|
return err
|
||||||
|
@ -193,7 +200,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("========================================")
|
log.Println("================================================================================")
|
||||||
|
|
||||||
tryCount := 0
|
tryCount := 0
|
||||||
for {
|
for {
|
||||||
|
@ -209,6 +216,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
strings.Contains(err.Error(), " reason: 'This video contains content from") ||
|
strings.Contains(err.Error(), " reason: 'This video contains content from") ||
|
||||||
strings.Contains(err.Error(), "dont know which claim to update") ||
|
strings.Contains(err.Error(), "dont know which claim to update") ||
|
||||||
strings.Contains(err.Error(), "uploader has not made this video available in your country") ||
|
strings.Contains(err.Error(), "uploader has not made this video available in your country") ||
|
||||||
|
strings.Contains(err.Error(), "download error: AccessDenied: Access Denied") ||
|
||||||
strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") {
|
strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") {
|
||||||
log.Println("This error should not be retried at all")
|
log.Println("This error should not be retried at all")
|
||||||
} else if tryCount >= s.MaxTries {
|
} else if tryCount >= s.MaxTries {
|
||||||
|
@ -225,7 +233,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) enqueueVideos() error {
|
func (s *Sync) enqueueYoutubeVideos() error {
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
|
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
|
||||||
}
|
}
|
||||||
|
@ -308,6 +316,55 @@ Enqueue:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Sync) enqueueUCBVideos() error {
|
||||||
|
var videos []video
|
||||||
|
|
||||||
|
csvFile, err := os.Open("ucb.csv")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reader := csv.NewReader(bufio.NewReader(csvFile))
|
||||||
|
for {
|
||||||
|
line, err := reader.Read()
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
data := struct {
|
||||||
|
PublishedAt string `json:"publishedAt"`
|
||||||
|
}{}
|
||||||
|
err = json.Unmarshal([]byte(line[4]), &data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Publishing %d videos\n", len(videos))
|
||||||
|
|
||||||
|
sort.Sort(byPublishedAt(videos))
|
||||||
|
|
||||||
|
Enqueue:
|
||||||
|
for _, v := range videos {
|
||||||
|
select {
|
||||||
|
case <-s.stop.Chan():
|
||||||
|
break Enqueue
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case s.queue <- v:
|
||||||
|
case <-s.stop.Chan():
|
||||||
|
break Enqueue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Sync) processVideo(v video) error {
|
func (s *Sync) processVideo(v video) error {
|
||||||
log.Println("Processing " + v.IDAndNum())
|
log.Println("Processing " + v.IDAndNum())
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
|
|
Loading…
Reference in a new issue