diff --git a/cmd/test.go b/cmd/test.go index f521dc9..b1147d2 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -20,15 +20,17 @@ func init() { RootCmd.AddCommand(testCmd) } +func strPtr(s string) *string { return &s } + func test(cmd *cobra.Command, args []string) { - daemon = jsonrpc.NewClient("") + daemon := jsonrpc.NewClient("") addresses, err := daemon.WalletList() if err != nil { panic(err) } else if addresses == nil || len(*addresses) == 0 { panic(fmt.Errorf("could not find an address in wallet")) } - claimAddress = (*addresses)[0] + claimAddress := (*addresses)[0] if claimAddress == "" { panic(fmt.Errorf("found blank claim address")) } diff --git a/cmd/ytsync.go b/cmd/ytsync.go index 6a0dc6d..b4e0c0a 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -1,28 +1,10 @@ package cmd import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "os" - "regexp" - "sort" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" + sync "github.com/lbryio/lbry.go/ytsync" - "github.com/lbryio/lbry.go/jsonrpc" - - "github.com/garyburd/redigo/redis" - ytdl "github.com/kkdai/youtube" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "google.golang.org/api/googleapi/transport" - "google.golang.org/api/youtube/v3" ) func init() { @@ -37,58 +19,17 @@ func init() { RootCmd.AddCommand(ytSyncCmd) } -const ( - concurrentVideos = 1 - redisHashKey = "ytsync" - redisSyncedVal = "t" - defaultMaxTries = 1 -) - -type video struct { - id string - channelID string - channelTitle string - title string - description string - playlistPosition int64 - publishedAt time.Time -} - -func (v video) getFilename() string { - return videoDirectory + "/" + v.id + ".mp4" -} - -// sorting videos -type byPublishedAt []video - -func (a byPublishedAt) Len() int { return len(a) } -func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byPublishedAt) Less(i, j int) bool { return a[i].publishedAt.Before(a[j].publishedAt) } - -type byPlaylistPosition []video - -func (a byPlaylistPosition) Len() int { return len(a) } -func (a byPlaylistPosition) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byPlaylistPosition) Less(i, j int) bool { return a[i].playlistPosition < a[j].playlistPosition } +const defaultMaxTries = 1 var ( - ytAPIKey string - channelID string - lbryChannelName string - stopOnError bool - maxTries int - - daemon *jsonrpc.Client - claimAddress string - videoDirectory string - redisPool *redis.Pool + stopOnError bool + maxTries int ) func ytsync(cmd *cobra.Command, args []string) { - var err error - - ytAPIKey = args[0] - channelID = args[1] + ytAPIKey := args[0] + channelID := args[1] + lbryChannelName := "" if len(args) > 2 { lbryChannelName = args[2] } @@ -102,397 +43,17 @@ func ytsync(cmd *cobra.Command, args []string) { return } - redisPool = &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 5 * time.Minute, - Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Minute { - return nil - } - _, err := c.Do("PING") - return err - }, + s := sync.Sync{ + YoutubeAPIKey: ytAPIKey, + YoutubeChannelID: channelID, + LbryChannelName: lbryChannelName, + StopOnError: stopOnError, + MaxTries: maxTries, + ConcurrentVideos: 1, } - var wg sync.WaitGroup - videoQueue := make(chan video) - - stopEnqueuing := make(chan struct{}) - sendStopEnqueuing := sync.Once{} - - var videoErrored atomic.Value - videoErrored.Store(false) - if stopOnError { - log.Println("Will stop publishing if an error is detected") - } - - daemon = jsonrpc.NewClient("") - videoDirectory, err = ioutil.TempDir("", "ytsync") + err := s.Go() if err != nil { panic(err) } - - if lbryChannelName != "" { - err = ensureChannelOwnership() - if err != nil { - panic(err) - } - } - - addresses, err := daemon.WalletList() - if err != nil { - panic(err) - } else if addresses == nil || len(*addresses) == 0 { - panic(fmt.Errorf("could not find an address in wallet")) - } - claimAddress = (*addresses)[0] - if claimAddress == "" { - panic(fmt.Errorf("found blank claim address")) - } - - for i := 0; i < concurrentVideos; i++ { - go func() { - wg.Add(1) - defer wg.Done() - - for { - v, more := <-videoQueue - if !more { - return - } - if stopOnError && videoErrored.Load().(bool) { - log.Println("Video errored. Exiting") - return - } - - tryCount := 0 - for { - tryCount++ - err := processVideo(v) - - if err != nil { - log.Errorln("error processing video: " + err.Error()) - if stopOnError { - videoErrored.Store(true) - sendStopEnqueuing.Do(func() { - stopEnqueuing <- struct{}{} - }) - } else if maxTries != defaultMaxTries { - if strings.Contains(err.Error(), "non 200 status code received") || - strings.Contains(err.Error(), " reason: 'This video contains content from") { - log.Println("This error should not be retried at all") - } else if tryCount >= maxTries { - log.Println("Video failed after " + strconv.Itoa(maxTries) + " retries, moving on") - } else { - log.Println("Retrying") - continue - } - } - } - break - } - } - }() - } - - err = enqueueVideosFromChannel(channelID, &videoQueue, &stopEnqueuing) - if err != nil { - panic(err) - } - close(videoQueue) - - wg.Wait() -} - -func ensureChannelOwnership() error { - channels, err := daemon.ChannelListMine() - if err != nil { - return err - } else if channels == nil { - return fmt.Errorf("no channels") - } - - for _, channel := range *channels { - if channel.Name == lbryChannelName { - return nil - } - } - - resolveResp, err := daemon.Resolve(lbryChannelName) - if err != nil { - return err - } - - channelNotFound := (*resolveResp)[lbryChannelName].Error == nil || strings.Contains(*((*resolveResp)[lbryChannelName].Error), "cannot be resolved") - - if !channelNotFound { - return fmt.Errorf("Channel exists and we don't own it. Pick another channel.") - } - - _, err = daemon.ChannelNew(lbryChannelName, 0.01) - if err != nil { - return err - } - - // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for channel claim to go through") - time.Sleep(wait) - - return nil -} - -func enqueueVideosFromChannel(channelID string, videoChan *chan video, stopEnqueuing *chan struct{}) error { - client := &http.Client{ - Transport: &transport.APIKey{Key: ytAPIKey}, - } - - service, err := youtube.New(client) - if err != nil { - return fmt.Errorf("error creating YouTube service: %v", err) - } - - response, err := service.Channels.List("contentDetails").Id(channelID).Do() - if err != nil { - return fmt.Errorf("error getting channels: %v", err) - } - - if len(response.Items) < 1 { - return fmt.Errorf("youtube channel not found") - } - - if response.Items[0].ContentDetails.RelatedPlaylists == nil { - return fmt.Errorf("no related playlists") - } - - playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads - if playlistID == "" { - return fmt.Errorf("no channel playlist") - } - - videos := []video{} - - nextPageToken := "" - for { - req := service.PlaylistItems.List("snippet"). - PlaylistId(playlistID). - MaxResults(50). - PageToken(nextPageToken) - - playlistResponse, err := req.Do() - if err != nil { - return fmt.Errorf("error getting playlist items: %v", err) - } - - if len(playlistResponse.Items) < 1 { - return fmt.Errorf("playlist items not found") - } - - for _, item := range playlistResponse.Items { - // todo: there's thumbnail info here. why did we need lambda??? - publishedAt, err := time.Parse(time.RFC3339Nano, item.Snippet.PublishedAt) - if err != nil { - return fmt.Errorf("failed to parse time: %v", err.Error()) - } - - // normally we'd send the video into the channel here, but youtube api doesn't have sorting - // so we have to get ALL the videos, then sort them, then send them in - videos = append(videos, video{ - id: item.Snippet.ResourceId.VideoId, - channelID: channelID, - title: item.Snippet.Title, - description: item.Snippet.Description, - channelTitle: item.Snippet.ChannelTitle, - playlistPosition: item.Snippet.Position, - publishedAt: publishedAt, - }) - } - - log.Infoln("Got info for " + strconv.Itoa(len(videos)) + " videos from youtube API") - - nextPageToken = playlistResponse.NextPageToken - if nextPageToken == "" { - break - } - } - - sort.Sort(byPublishedAt(videos)) - //or sort.Sort(sort.Reverse(byPlaylistPosition(videos))) - - for _, v := range videos { - select { - case *videoChan <- v: - case <-*stopEnqueuing: - return nil - } - } - - return nil -} - -func processVideo(v video) error { - log.Println("========================================") - log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)") - - conn := redisPool.Get() - defer conn.Close() - - alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, v.id)) - if err != nil && err != redis.ErrNil { - return fmt.Errorf("redis error: %s", err.Error()) - } - if alreadyPublished == redisSyncedVal { - log.Println(v.id + " already published") - return nil - } - - //download and thumbnail can be done in parallel - err = downloadVideo(v) - if err != nil { - return fmt.Errorf("download error: %s", err.Error()) - } - - err = triggerThumbnailSave(v.id) - if err != nil { - return fmt.Errorf("thumbnail error: %s", err.Error()) - } - - err = publish(v, conn) - if err != nil { - return fmt.Errorf("publish error: %s", err.Error()) - } - - return nil -} - -func downloadVideo(v video) error { - verbose := false - videoPath := v.getFilename() - - _, err := os.Stat(videoPath) - if err != nil && !os.IsNotExist(err) { - return err - } else if err == nil { - log.Println(v.id + " already exists at " + videoPath) - return nil - } - - downloader := ytdl.NewYoutube(verbose) - err = downloader.DecodeURL("https://www.youtube.com/watch?v=" + v.id) - if err != nil { - return err - } - err = downloader.StartDownload(videoPath) - if err != nil { - return err - } - log.Debugln("Downloaded " + v.id) - return nil -} - -func triggerThumbnailSave(videoID string) error { - client := &http.Client{Timeout: 30 * time.Second} - - params, err := json.Marshal(map[string]string{"videoid": videoID}) - if err != nil { - return err - } - - request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params)) - if err != nil { - return err - } - - response, err := client.Do(request) - if err != nil { - return err - } - defer response.Body.Close() - - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return err - } - - var decoded struct { - error int `json:"error"` - url string `json:"url,omitempty"` - message string `json:"message,omitempty"` - } - err = json.Unmarshal(contents, &decoded) - if err != nil { - return err - } - - if decoded.error != 0 { - return fmt.Errorf("error creating thumbnail: " + decoded.message) - } - - log.Debugln("Created thumbnail for " + videoID) - - return nil -} - -func strPtr(s string) *string { return &s } - -func titleToClaimName(name string) string { - maxLen := 40 - reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) - - chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(name, "-"), "-")), "-") - - name = chunks[0] - if len(name) > maxLen { - return name[:maxLen] - } - - for _, chunk := range chunks[1:] { - tmpName := name + "-" + chunk - if len(tmpName) > maxLen { - if len(name) < 20 { - name = tmpName[:maxLen] - } - break - } - name = tmpName - } - - return name -} - -func limitDescription(description string) string { - maxLines := 10 - description = strings.TrimSpace(description) - if strings.Count(description, "\n") < maxLines { - return description - } - return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." -} - -func publish(v video, conn redis.Conn) error { - options := jsonrpc.PublishOptions{ - Title: &v.title, - Author: &v.channelTitle, - Description: strPtr(limitDescription(v.description) + "\nhttps://www.youtube.com/watch?v=" + v.id), - Language: strPtr("en"), - ClaimAddress: &claimAddress, - Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id), - License: strPtr("Copyrighted (contact author)"), - } - if lbryChannelName != "" { - options.ChannelName = &lbryChannelName - } - - _, err := daemon.Publish(titleToClaimName(v.title), v.getFilename(), 0.01, options) - if err != nil { - return err - } - - _, err = redis.Bool(conn.Do("HSET", redisHashKey, v.id, redisSyncedVal)) - if err != nil { - return fmt.Errorf("redis error: %s", err.Error()) - } - - return nil } diff --git a/ytsync/video.go b/ytsync/video.go new file mode 100644 index 0000000..b7622a0 --- /dev/null +++ b/ytsync/video.go @@ -0,0 +1,69 @@ +package ytsync + +import ( + "regexp" + "strings" + "time" +) + +type video struct { + id string + channelID string + channelTitle string + title string + description string + playlistPosition int64 + publishedAt time.Time + dir string +} + +func (v video) getFilename() string { + return v.dir + "/" + v.id + ".mp4" +} + +func (v video) getClaimName() string { + maxLen := 40 + reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) + + chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-") + + name := chunks[0] + if len(name) > maxLen { + return name[:maxLen] + } + + for _, chunk := range chunks[1:] { + tmpName := name + "-" + chunk + if len(tmpName) > maxLen { + if len(name) < 20 { + name = tmpName[:maxLen] + } + break + } + name = tmpName + } + + return name +} + +func (v video) getAbbrevDescription() string { + maxLines := 10 + description := strings.TrimSpace(v.description) + if strings.Count(description, "\n") < maxLines { + return description + } + return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." +} + +// sorting videos +type byPublishedAt []video + +func (a byPublishedAt) Len() int { return len(a) } +func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byPublishedAt) Less(i, j int) bool { return a[i].publishedAt.Before(a[j].publishedAt) } + +type byPlaylistPosition []video + +func (a byPlaylistPosition) Len() int { return len(a) } +func (a byPlaylistPosition) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byPlaylistPosition) Less(i, j int) bool { return a[i].playlistPosition < a[j].playlistPosition } diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go new file mode 100644 index 0000000..da851f1 --- /dev/null +++ b/ytsync/ytsync.go @@ -0,0 +1,408 @@ +package ytsync + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "os" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/lbryio/lbry.go/jsonrpc" + + "github.com/garyburd/redigo/redis" + "github.com/go-errors/errors" + ytdl "github.com/kkdai/youtube" + log "github.com/sirupsen/logrus" + "google.golang.org/api/googleapi/transport" + "google.golang.org/api/youtube/v3" +) + +const ( + redisHashKey = "ytsync" + redisSyncedVal = "t" +) + +// Sync stores the options that control how syncing happens +type Sync struct { + YoutubeAPIKey string + YoutubeChannelID string + LbryChannelName string + StopOnError bool + MaxTries int + ConcurrentVideos int + + daemon *jsonrpc.Client + claimAddress string + videoDirectory string + redisPool *redis.Pool +} + +func (s *Sync) Go() error { + var err error + + s.redisPool = &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 5 * time.Minute, + Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + if time.Since(t) < time.Minute { + return nil + } + _, err := c.Do("PING") + return err + }, + } + + s.videoDirectory, err = ioutil.TempDir("", "ytsync") + if err != nil { + return err + } + + s.daemon = jsonrpc.NewClient("") + + addresses, err := s.daemon.WalletList() + if err != nil { + return err + } else if addresses == nil || len(*addresses) == 0 { + return errors.New("could not find an address in wallet") + } + claimAddress := (*addresses)[0] + if claimAddress == "" { + return errors.New("found blank claim address") + } + + var wg sync.WaitGroup + videoQueue := make(chan video) + + stopEnqueuing := make(chan struct{}) + sendStopEnqueuing := sync.Once{} + + var videoErrored atomic.Value + videoErrored.Store(false) + if s.StopOnError { + log.Println("Will stop publishing if an error is detected") + } + + if s.LbryChannelName != "" { + err = s.ensureChannelOwnership() + if err != nil { + return err + } + } + + for i := 0; i < s.ConcurrentVideos; i++ { + go func() { + wg.Add(1) + defer wg.Done() + + for { + v, more := <-videoQueue + if !more { + return + } + if s.StopOnError && videoErrored.Load().(bool) { + log.Println("Video errored. Exiting") + return + } + + tryCount := 0 + for { + tryCount++ + err := s.processVideo(v) + + if err != nil { + log.Errorln("error processing video: " + err.Error()) + if s.StopOnError { + videoErrored.Store(true) + sendStopEnqueuing.Do(func() { + stopEnqueuing <- struct{}{} + }) + } else if s.MaxTries > 1 { + if strings.Contains(err.Error(), "non 200 status code received") || + strings.Contains(err.Error(), " reason: 'This video contains content from") { + log.Println("This error should not be retried at all") + } else if tryCount >= s.MaxTries { + log.Println("Video failed after " + strconv.Itoa(s.MaxTries) + " retries, moving on") + } else { + log.Println("Retrying") + continue + } + } + } + break + } + } + }() + } + + err = s.enqueueVideosFromChannel(s.YoutubeChannelID, &videoQueue, &stopEnqueuing) + close(videoQueue) + wg.Wait() + return err +} + +func (s *Sync) ensureChannelOwnership() error { + channels, err := s.daemon.ChannelListMine() + if err != nil { + return err + } else if channels == nil { + return errors.New("no channels") + } + + for _, channel := range *channels { + if channel.Name == s.LbryChannelName { + return nil + } + } + + resolveResp, err := s.daemon.Resolve(s.LbryChannelName) + if err != nil { + return err + } + + channelNotFound := (*resolveResp)[s.LbryChannelName].Error == nil || strings.Contains(*((*resolveResp)[s.LbryChannelName].Error), "cannot be resolved") + + if !channelNotFound { + return errors.New("Channel exists and we don't own it. Pick another channel.") + } + + _, err = s.daemon.ChannelNew(s.LbryChannelName, 0.01) + if err != nil { + return err + } + + // niko's code says "unfortunately the queues in the daemon are not yet merged so we must give it some time for the channel to go through" + wait := 15 * time.Second + log.Println("Waiting " + wait.String() + " for channel claim to go through") + time.Sleep(wait) + + return nil +} + +func (s *Sync) enqueueVideosFromChannel(channelID string, videoChan *chan video, stopEnqueuing *chan struct{}) error { + client := &http.Client{ + Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, + } + + service, err := youtube.New(client) + if err != nil { + return errors.WrapPrefix(err, "error creating YouTube service", 0) + } + + response, err := service.Channels.List("contentDetails").Id(channelID).Do() + if err != nil { + return errors.WrapPrefix(err, "error getting channels", 0) + } + + if len(response.Items) < 1 { + return errors.New("youtube channel not found") + } + + if response.Items[0].ContentDetails.RelatedPlaylists == nil { + return errors.New("no related playlists") + } + + playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads + if playlistID == "" { + return errors.New("no channel playlist") + } + + videos := []video{} + + nextPageToken := "" + for { + req := service.PlaylistItems.List("snippet"). + PlaylistId(playlistID). + MaxResults(50). + PageToken(nextPageToken) + + playlistResponse, err := req.Do() + if err != nil { + return errors.WrapPrefix(err, "error getting playlist items", 0) + } + + if len(playlistResponse.Items) < 1 { + return errors.New("playlist items not found") + } + + for _, item := range playlistResponse.Items { + // todo: there's thumbnail info here. why did we need lambda??? + publishedAt, err := time.Parse(time.RFC3339Nano, item.Snippet.PublishedAt) + if err != nil { + return errors.WrapPrefix(err, "failed to parse time", 0) + } + + // normally we'd send the video into the channel here, but youtube api doesn't have sorting + // so we have to get ALL the videos, then sort them, then send them in + videos = append(videos, video{ + id: item.Snippet.ResourceId.VideoId, + channelID: channelID, + title: item.Snippet.Title, + description: item.Snippet.Description, + channelTitle: item.Snippet.ChannelTitle, + playlistPosition: item.Snippet.Position, + publishedAt: publishedAt, + dir: s.videoDirectory, + }) + } + + log.Infoln("Got info for " + strconv.Itoa(len(videos)) + " videos from youtube API") + + nextPageToken = playlistResponse.NextPageToken + if nextPageToken == "" { + break + } + } + + sort.Sort(byPublishedAt(videos)) + //or sort.Sort(sort.Reverse(byPlaylistPosition(videos))) + + for _, v := range videos { + select { + case *videoChan <- v: + case <-*stopEnqueuing: + return nil + } + } + + return nil +} + +func (s *Sync) processVideo(v video) error { + log.Println("========================================") + log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)") + + conn := s.redisPool.Get() + defer conn.Close() + + alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, v.id)) + if err != nil && err != redis.ErrNil { + return errors.WrapPrefix(err, "redis error", 0) + + } + if alreadyPublished == redisSyncedVal { + log.Println(v.id + " already published") + return nil + } + + //download and thumbnail can be done in parallel + err = downloadVideo(v) + if err != nil { + return errors.WrapPrefix(err, "download error", 0) + } + + err = triggerThumbnailSave(v.id) + if err != nil { + return errors.WrapPrefix(err, "thumbnail error", 0) + } + + err = s.publish(v, conn) + if err != nil { + return errors.WrapPrefix(err, "publish error", 0) + } + + return nil +} + +func downloadVideo(v video) error { + verbose := false + videoPath := v.getFilename() + + _, err := os.Stat(videoPath) + if err != nil && !os.IsNotExist(err) { + return err + } else if err == nil { + log.Println(v.id + " already exists at " + videoPath) + return nil + } + + downloader := ytdl.NewYoutube(verbose) + err = downloader.DecodeURL("https://www.youtube.com/watch?v=" + v.id) + if err != nil { + return err + } + err = downloader.StartDownload(videoPath) + if err != nil { + return err + } + log.Debugln("Downloaded " + v.id) + return nil +} + +func triggerThumbnailSave(videoID string) error { + client := &http.Client{Timeout: 30 * time.Second} + + params, err := json.Marshal(map[string]string{"videoid": videoID}) + if err != nil { + return err + } + + request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params)) + if err != nil { + return err + } + + response, err := client.Do(request) + if err != nil { + return err + } + defer response.Body.Close() + + contents, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + + var decoded struct { + error int `json:"error"` + url string `json:"url,omitempty"` + message string `json:"message,omitempty"` + } + err = json.Unmarshal(contents, &decoded) + if err != nil { + return err + } + + if decoded.error != 0 { + return errors.New("error creating thumbnail: " + decoded.message) + } + + log.Debugln("Created thumbnail for " + videoID) + + return nil +} + +func strPtr(s string) *string { return &s } + +func (s *Sync) publish(v video, conn redis.Conn) error { + options := jsonrpc.PublishOptions{ + Title: &v.title, + Author: &v.channelTitle, + Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id), + Language: strPtr("en"), + ClaimAddress: &s.claimAddress, + Thumbnail: strPtr("http://berk.ninja/thumbnails/" + v.id), + License: strPtr("Copyrighted (contact author)"), + } + if s.LbryChannelName != "" { + options.ChannelName = &s.LbryChannelName + } + + _, err := s.daemon.Publish(v.getClaimName(), v.getFilename(), 0.01, options) + if err != nil { + return err + } + + _, err = redis.Bool(conn.Do("HSET", redisHashKey, v.id, redisSyncedVal)) + if err != nil { + return errors.New("redis error: " + err.Error()) + } + + return nil +}