add support for new SDK

add support for direct thumbnails upload
remove UCB code
add support for republishing
refactoring
This commit is contained in:
Niko Storni 2019-05-03 05:11:52 +02:00
parent a648287e1c
commit 23b8fb4e54
9 changed files with 189 additions and 327 deletions

2
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect github.com/kr/pretty v0.1.0 // indirect
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f github.com/lbryio/lbry.go v0.0.0-20190503025608-3a22a0af0a45
github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936 github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
github.com/mitchellh/mapstructure v1.1.2 // indirect github.com/mitchellh/mapstructure v1.1.2 // indirect

15
go.sum
View file

@ -125,17 +125,11 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c h1:BhdcWGsuKif/XoSZnqVGNqJ1iEmH0czWR5upj+AuR8M= github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c h1:BhdcWGsuKif/XoSZnqVGNqJ1iEmH0czWR5upj+AuR8M=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpUqE8hRA3OrYYosw9+Sl681BF9cwcjzE+OCNK8= github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpUqE8hRA3OrYYosw9+Sl681BF9cwcjzE+OCNK8=
github.com/lbryio/lbry.go v0.0.0-20190419005332-80b25b225e18 h1:ZhWjtvaq5r7julhcF9OSgx4bLv9UsdIx27zH1/fbrEc= github.com/lbryio/lbry.go v0.0.0-20190503025608-3a22a0af0a45 h1:6fpRUui1G8xKiQqL51fHJwrpqYSd4G3RIYWsRd+R8x8=
github.com/lbryio/lbry.go v0.0.0-20190419005332-80b25b225e18/go.mod h1:kd08aOMCuBVYJ3EafY4Kx3dRAWWQYhobJ9beREgsaRI= github.com/lbryio/lbry.go v0.0.0-20190503025608-3a22a0af0a45/go.mod h1:rAREtjHq/Wkkqy4+yt89Er/7JAVu6uMo3v8nevCEEIA=
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f h1:o6EZ7bAdYrS6pKp85SEr6Ywyy2JDJS1CY3ChkVsvSM4= github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002 h1:urfYK5ElpUrAv90auPLldoVC60LwiGAcY0OE6HJB9KI=
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f/go.mod h1:FubnMAYvLt2jGasG7BrQsokYHZ2wpNtWethPHUVauMc= github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
github.com/lbryio/lbryschema.go v0.0.0-20190422030648-322c658307e0 h1:/YWLlbbDefRGLs/ozyuRpvpwpFISYehwR4AAVlPthA8=
github.com/lbryio/lbryschema.go v0.0.0-20190422030648-322c658307e0/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:fbG/dzobG8r95KzMwckXiLMHfFjZaBRQqC9hPs2XAQ4= github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:fbG/dzobG8r95KzMwckXiLMHfFjZaBRQqC9hPs2XAQ4=
github.com/lbryio/types v0.0.0-20190405005919-54c3c28f676a h1:twWvrsBDvSb+qnmpSq3nvFrodgC5PpXUipyo4T/W790=
github.com/lbryio/types v0.0.0-20190405005919-54c3c28f676a/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lbryio/types v0.0.0-20190415181811-35ddf1afe731 h1:iERWR8Dkng30eRInI7gzolTEJBW9nBSK/sT+Z9aSipI=
github.com/lbryio/types v0.0.0-20190415181811-35ddf1afe731/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c h1:m3O7561xBQ00lfUVayW4c6SnpVbUDQtPUwGcGYSUYQA= github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c h1:m3O7561xBQ00lfUVayW4c6SnpVbUDQtPUwGcGYSUYQA=
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE= github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5 h1:AsEBgzv3DhuYHI/GiQh2HxvTP71HCCE9E/tzGUzGdtU= github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5 h1:AsEBgzv3DhuYHI/GiQh2HxvTP71HCCE9E/tzGUzGdtU=
@ -304,6 +298,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -2,6 +2,8 @@ package manager
import ( import (
"fmt" "fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@ -70,17 +72,18 @@ func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool
} }
const ( const (
StatusPending = "pending" // waiting for permission to sync StatusPending = "pending" // waiting for permission to sync
StatusPendingEmail = "pendingemail" // permission granted but missing email StatusPendingEmail = "pendingemail" // permission granted but missing email
StatusQueued = "queued" // in sync queue. will be synced soon StatusQueued = "queued" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
StatusSynced = "synced" // done StatusSyncing = "syncing" // syncing now
StatusFailed = "failed" StatusSynced = "synced" // done
StatusFinalized = "finalized" // no more changes allowed StatusFailed = "failed"
StatusAbandoned = "abandoned" // deleted on youtube or banned StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned
) )
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned} var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned}
const ( const (
VideoStatusPublished = "published" VideoStatusPublished = "published"
@ -207,7 +210,12 @@ func (s *SyncManager) Start() error {
} }
return nil return nil
} }
func (s *SyncManager) GetS3AWSConfig() aws.Config {
return aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsS3ID, s.awsS3Secret, ""),
Region: &s.awsS3Region,
}
}
func (s *SyncManager) checkUsedSpace() error { func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.blobsDir) usedPctile, err := GetUsedSpace(s.blobsDir)
if err != nil { if err != nil {

View file

@ -7,8 +7,6 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc" "github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/lbry.go/extras/util" "github.com/lbryio/lbry.go/extras/util"
@ -301,20 +299,8 @@ func (s *Sync) ensureChannelOwnership() error {
channelInfo := response.Items[0].Snippet channelInfo := response.Items[0].Snippet
thumbnail := channelInfo.Thumbnails.Default thumbnail := thumbs.GetBestThumbnail(channelInfo.Thumbnails)
if channelInfo.Thumbnails.Maxres != nil { thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail.Url, s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
thumbnail = channelInfo.Thumbnails.Maxres
} else if channelInfo.Thumbnails.High != nil {
thumbnail = channelInfo.Thumbnails.High
} else if channelInfo.Thumbnails.Medium != nil {
thumbnail = channelInfo.Thumbnails.Medium
} else if channelInfo.Thumbnails.Standard != nil {
thumbnail = channelInfo.Thumbnails.Standard
}
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail.Url, s.YoutubeChannelID, aws.Config{
Credentials: credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, ""),
Region: &s.AwsS3Region,
})
if err != nil { if err != nil {
return err return err
} }

View file

@ -1,11 +1,7 @@
package manager package manager
import ( import (
"bufio"
"encoding/csv"
"encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os" "os"
@ -21,6 +17,7 @@ import (
"github.com/lbryio/ytsync/namer" "github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk" "github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources" "github.com/lbryio/ytsync/sources"
"github.com/lbryio/ytsync/thumbs"
"github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc" "github.com/lbryio/lbry.go/extras/jsonrpc"
@ -51,7 +48,7 @@ type video interface {
IDAndNum() string IDAndNum() string
PlaylistPosition() int PlaylistPosition() int
PublishedAt() time.Time PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer, float64) (*sources.SyncSummary, error) Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool) (*sources.SyncSummary, error)
} }
// sorting videos // sorting videos
@ -319,6 +316,7 @@ func (s *Sync) FullCycle() (e error) {
return nil return nil
} }
func (s *Sync) setChannelTerminationStatus(e *error) { func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil { if *e != nil {
//conditions for which a channel shouldn't be marked as failed //conditions for which a channel shouldn't be marked as failed
@ -390,7 +388,7 @@ func logShutdownError(shutdownErr error) {
var thumbnailHosts = []string{ var thumbnailHosts = []string{
"berk.ninja/thumbnails/", "berk.ninja/thumbnails/",
"https://thumbnails.lbry.com/", thumbs.ThumbnailEndpoint,
} }
func isYtsyncClaim(c jsonrpc.Claim) bool { func isYtsyncClaim(c jsonrpc.Claim) bool {
@ -545,7 +543,7 @@ func (s *Sync) doSync() error {
} }
if s.LbryChannelName == "@UCBerkeley" { if s.LbryChannelName == "@UCBerkeley" {
err = s.enqueueUCBVideos() err = errors.Err("UCB is not supported on this version of YTSYNC")
} else { } else {
err = s.enqueueYoutubeVideos() err = s.enqueueYoutubeVideos()
} }
@ -713,14 +711,14 @@ func (s *Sync) enqueueYoutubeVideos() error {
playlistMap[item.Snippet.ResourceId.VideoId] = item.Snippet playlistMap[item.Snippet.ResourceId.VideoId] = item.Snippet
videoIDs[i] = item.Snippet.ResourceId.VideoId videoIDs[i] = item.Snippet.ResourceId.VideoId
} }
req2 := service.Videos.List("snippet,contentDetails").Id(strings.Join(videoIDs[:], ",")) req2 := service.Videos.List("snippet,contentDetails,recordingDetails").Id(strings.Join(videoIDs[:], ","))
videosListResponse, err := req2.Do() videosListResponse, err := req2.Do()
if err != nil { if err != nil {
return errors.Prefix("error getting videos info", err) return errors.Prefix("error getting videos info", err)
} }
for _, item := range videosListResponse.Items { for _, item := range videosListResponse.Items {
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position)) videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig()))
} }
log.Infof("Got info for %d videos from youtube API", len(videos)) log.Infof("Got info for %d videos from youtube API", len(videos))
@ -752,55 +750,6 @@ Enqueue:
return nil return nil
} }
func (s *Sync) enqueueUCBVideos() error {
var videos []video
csvFile, err := os.Open("ucb.csv")
if err != nil {
return err
}
reader := csv.NewReader(bufio.NewReader(csvFile))
for {
line, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
data := struct {
PublishedAt string `json:"publishedAt"`
}{}
err = json.Unmarshal([]byte(line[4]), &data)
if err != nil {
return err
}
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
}
log.Printf("Publishing %d videos\n", len(videos))
sort.Sort(byPublishedAt(videos))
Enqueue:
for _, v := range videos {
select {
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
break Enqueue
}
}
return nil
}
func (s *Sync) processVideo(v video) (err error) { func (s *Sync) processVideo(v video) (err error) {
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
@ -851,8 +800,17 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil { if err != nil {
return err return err
} }
sp := sources.SyncParams{
ClaimAddress: s.claimAddress,
Amount: publishAmount,
ChannelID: s.lbryChannelID,
MaxVideoSize: s.Manager.maxVideoSize,
Namer: s.namer,
MaxVideoLength: s.Manager.maxVideoLength,
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer, s.Manager.maxVideoLength) isUpgradeSync := s.Manager.syncStatus == StatusPendingUpgrade
summary, err := v.Sync(s.daemon, sp, &sv, isUpgradeSync)
if err != nil { if err != nil {
return err return err
} }

View file

@ -76,10 +76,13 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC
} }
type SyncedVideo struct { type SyncedVideo struct {
VideoID string `json:"video_id"` VideoID string `json:"video_id"`
Published bool `json:"published"` Published bool `json:"published"`
FailureReason string `json:"failure_reason"` FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"` ClaimName string `json:"claim_name"`
ClaimID string `json:"claim_id"`
Size int64 `json:"size"`
MetadataVersion int8 `json:"metadata_version"`
} }
func sanitizeFailureReason(s *string) { func sanitizeFailureReason(s *string) {
@ -121,7 +124,9 @@ func (a *APIConfig) SetChannelStatus(channelID string, status string, failureRea
claimNames := make(map[string]bool) claimNames := make(map[string]bool)
for _, v := range response.Data { for _, v := range response.Data {
svs[v.VideoID] = v svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published if v.ClaimName != "" {
claimNames[v.ClaimName] = v.Published
}
} }
return svs, claimNames, nil return svs, claimNames, nil
} }

View file

@ -1,219 +0,0 @@
package sources
import (
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/ytsync/namer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
log "github.com/sirupsen/logrus"
)
type ucbVideo struct {
id string
title string
channel string
description string
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
return &ucbVideo{
id: id,
title: title,
description: description,
channel: channel,
dir: dir,
publishedAt: p,
}
}
func (v *ucbVideo) ID() string {
return v.id
}
func (v *ucbVideo) PlaylistPosition() int {
return 0
}
func (v *ucbVideo) IDAndNum() string {
return v.ID() + " (?)"
}
func (v *ucbVideo) PublishedAt() time.Time {
return v.publishedAt
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
//matches := r.FindStringSubmatch(v.title)
//if len(matches) > 0 {
// year, _ := strconv.Atoi(matches[1])
// month, _ := strconv.Atoi(matches[2])
// day, _ := strconv.Atoi(matches[3])
// return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC)
//}
//return time.Now()
}
func (v *ucbVideo) getFilename() string {
return v.dir + "/" + v.id + ".mp4"
}
func (v *ucbVideo) getClaimName(attempt int) string {
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}
func (v *ucbVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
return description
}
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v *ucbVideo) download() error {
videoPath := v.getFilename()
_, err := os.Stat(videoPath)
if err != nil && !os.IsNotExist(err) {
return err
} else if err == nil {
log.Debugln(v.id + " already exists at " + videoPath)
return nil
}
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s)
out, err := os.Create(videoPath)
if err != nil {
return err
}
defer out.Close()
log.Println("lbry-niko2/videos/" + v.channel + "/" + v.id)
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String("lbry-niko2"),
Key: aws.String("/videos/" + v.channel + "/" + v.id + ".mp4"),
})
if err != nil {
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return nil
}
func (v *ucbVideo) saveThumbnail() error {
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
if err != nil {
return err
}
defer resp.Body.Close()
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("berk.ninja"),
Key: aws.String("thumbnails/" + v.id),
ContentType: aws.String("image/jpeg"),
Body: resp.Body,
})
return err
}
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
options := jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: v.title,
Description: v.getAbbrevDescription(),
ClaimAddress: &claimAddress,
Languages: []string{"en"},
ThumbnailURL: strPtr("https://berk.ninja/thumbnails/" + v.id),
Tags: []string{},
},
Author: strPtr("UC Berkeley"),
License: strPtr("see description"),
StreamType: &jsonrpc.StreamTypeVideo,
ChannelID: &channelID,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
}
func (v *ucbVideo) Size() *int64 {
return nil
}
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer, maxVideoLength float64) (*SyncSummary, error) {
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
return nil, errors.Prefix("download error", err)
}
log.Debugln("Downloaded " + v.id)
//err = v.SaveThumbnail()
//if err != nil {
// return errors.WrapPrefix(err, "thumbnail error", 0)
//}
//log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
if err != nil {
return nil, errors.Prefix("publish error", err)
}
return summary, nil
}

View file

@ -3,6 +3,7 @@ package sources
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"math" "math"
"net/http" "net/http"
@ -12,12 +13,16 @@ import (
"strings" "strings"
"time" "time"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/thumbs"
"github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc" "github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/lbry.go/extras/util" "github.com/lbryio/lbry.go/extras/util"
"github.com/lbryio/ytsync/namer" "github.com/lbryio/ytsync/namer"
"github.com/ChannelMeter/iso8601duration" "github.com/ChannelMeter/iso8601duration"
"github.com/aws/aws-sdk-go/aws"
"github.com/nikooo777/ytdl" "github.com/nikooo777/ytdl"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3" "google.golang.org/api/youtube/v3"
@ -36,9 +41,45 @@ type YoutubeVideo struct {
dir string dir string
youtubeInfo *youtube.Video youtubeInfo *youtube.Video
tags []string tags []string
awsConfig aws.Config
} }
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64) *YoutubeVideo { var youtubeCategories = map[string]string{
"1": "Film & Animation",
"2": "Autos & Vehicles",
"10": "Music",
"15": "Pets & Animals",
"17": "Sports",
"18": "Short Movies",
"19": "Travel & Events",
"20": "Gaming",
"21": "Videoblogging",
"22": "People & Blogs",
"23": "Comedy",
"24": "Entertainment",
"25": "News & Politics",
"26": "Howto & Style",
"27": "Education",
"28": "Science & Technology",
"29": "Nonprofits & Activism",
"30": "Movies",
"31": "Anime/Animation",
"32": "Action/Adventure",
"33": "Classics",
"34": "Comedy",
"35": "Documentary",
"36": "Drama",
"37": "Family",
"38": "Foreign",
"39": "Horror",
"40": "Sci-Fi/Fantasy",
"41": "Thriller",
"42": "Shorts",
"43": "Shows",
"44": "Trailers",
}
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors
return &YoutubeVideo{ return &YoutubeVideo{
id: videoData.Id, id: videoData.Id,
@ -49,6 +90,7 @@ func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPositio
publishedAt: publishedAt, publishedAt: publishedAt,
dir: directory, dir: directory,
youtubeInfo: videoData, youtubeInfo: videoData,
awsConfig: awsConfig,
} }
} }
@ -267,7 +309,7 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
Description: v.getAbbrevDescription() + additionalDescription, Description: v.getAbbrevDescription() + additionalDescription,
ClaimAddress: &claimAddress, ClaimAddress: &claimAddress,
Languages: languages, Languages: languages,
ThumbnailURL: strPtr("https://thumbnails.lbry.com/" + v.id), ThumbnailURL: strPtr(thumbs.ThumbnailEndpoint + v.id),
Tags: v.youtubeInfo.Snippet.Tags, Tags: v.youtubeInfo.Snippet.Tags,
}, },
Author: strPtr(v.channelTitle), Author: strPtr(v.channelTitle),
@ -285,9 +327,24 @@ func (v *YoutubeVideo) Size() *int64 {
return v.size return v.size
} }
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer, maxVideoLength float64) (*SyncSummary, error) { type SyncParams struct {
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024 ClaimAddress string
v.maxVideoLength = maxVideoLength Amount float64
ChannelID string
MaxVideoSize int
Namer *namer.Namer
MaxVideoLength float64
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo, reprocess bool) (*SyncSummary, error) {
v.maxVideoSize = int64(params.MaxVideoSize) * 1024 * 1024
v.maxVideoLength = params.MaxVideoLength
if reprocess {
summary, err := v.reprocess(daemon, params.ChannelID, existingVideoData)
return summary, err
}
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -301,9 +358,66 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
} }
log.Debugln("Created thumbnail for " + v.id) log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer) summary, err := v.publish(daemon, params.ClaimAddress, params.Amount, params.ChannelID, params.Namer)
//delete the video in all cases (and ignore the error) //delete the video in all cases (and ignore the error)
_ = v.delete() _ = v.delete()
return summary, errors.Prefix("publish error", err) return summary, errors.Prefix("publish error", err)
} }
func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, channelID string, existingVideoData *sdk.SyncedVideo) (*SyncSummary, error) {
c, err := daemon.ClaimSearch(nil, &existingVideoData.ClaimID, nil, nil)
if err != nil {
return nil, errors.Err(err)
}
if len(c.Claims) == 0 {
return nil, errors.Err("cannot reprocess: no claim found for this video")
} else if len(c.Claims) > 1 {
return nil, errors.Err("cannot reprocess: too many claims. claimID: %s", existingVideoData.ClaimID)
}
currentClaim := c.Claims[0]
var languages []string = nil
if v.youtubeInfo.Snippet.DefaultLanguage != "" {
languages = []string{v.youtubeInfo.Snippet.DefaultLanguage}
}
var locations []jsonrpc.Location = nil
if v.youtubeInfo.RecordingDetails.Location != nil {
locations = []jsonrpc.Location{{
Latitude: util.PtrToString(fmt.Sprintf("%.7f", v.youtubeInfo.RecordingDetails.Location.Latitude)),
Longitude: util.PtrToString(fmt.Sprintf("%.7f", v.youtubeInfo.RecordingDetails.Location.Longitude)),
}}
}
thumbnailURL := ""
if currentClaim.Value.GetThumbnail() == nil {
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Snippet.Thumbnails)
thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.Url, v.ID(), v.awsConfig)
} else {
thumbnailURL = thumbs.ThumbnailEndpoint + v.ID()
}
tags := append(v.youtubeInfo.Snippet.Tags, youtubeCategories[v.youtubeInfo.Snippet.CategoryId])
videoDuration, err := duration.FromString(v.youtubeInfo.ContentDetails.Duration)
_, err = daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
StreamCreateOptions: &jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: v.title,
Description: v.getAbbrevDescription(),
Tags: tags,
Languages: languages,
Locations: locations,
ThumbnailURL: &thumbnailURL,
},
Author: util.PtrToString(""),
License: strPtr("Copyrighted (contact author)"),
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
VideoDuration: util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds()))),
ChannelID: &channelID,
},
})
return &SyncSummary{}, nil
}

View file

@ -1,6 +1,7 @@
package thumbs package thumbs
import ( import (
"google.golang.org/api/youtube/v3"
"io" "io"
"net/http" "net/http"
"os" "os"
@ -21,6 +22,7 @@ type thumbnailUploader struct {
} }
const thumbnailPath = "/tmp/ytsync_thumbnails/" const thumbnailPath = "/tmp/ytsync_thumbnails/"
const ThumbnailEndpoint = "https://thumbnails.lbry.com/"
func (u *thumbnailUploader) downloadThumbnail() error { func (u *thumbnailUploader) downloadThumbnail() error {
_ = os.Mkdir(thumbnailPath, 0750) _ = os.Mkdir(thumbnailPath, 0750)
@ -63,7 +65,7 @@ func (u *thumbnailUploader) uploadThumbnail() error {
Key: key, Key: key,
Body: thumb, Body: thumb,
}) })
u.mirroredUrl = "https://thumbnails.lbry.com/" + u.name u.mirroredUrl = ThumbnailEndpoint + u.name
return errors.Err(err) return errors.Err(err)
} }
@ -92,3 +94,16 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro
return tu.mirroredUrl, nil return tu.mirroredUrl, nil
} }
func GetBestThumbnail(thumbnails *youtube.ThumbnailDetails) *youtube.Thumbnail {
if thumbnails.Maxres != nil {
return thumbnails.Maxres
} else if thumbnails.High != nil {
return thumbnails.High
} else if thumbnails.Medium != nil {
return thumbnails.Medium
} else if thumbnails.Standard != nil {
return thumbnails.Standard
}
return thumbnails.Default
}